java版的producer开发流程
java版的producer开发流程文章目录java版的producer开发流程1、构造 Properties 对象2、构造 Kafka Producer 对象3、 构造 ProducerRecord 对象4、发送消息4.1、异步发送4.2、同步发送4.3、异常信息5、关闭producerJava 版本 producer 工作流程如图:Java 版本 producer 工作流程import jav
java版的producer开发流程
文章目录
Java 版本 producer 工作流程如图:
Java 版本 producer 工作流程
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String[] args) throws Exception {
// 1、构造 Properties 对象
Properties props = new Properties();
props.put("bootstrap.servers", " localhost:9092");// 必填
props.put("key.serializer", "org.apache.kafka.common.serialization. StringSerializer");// 必填
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 必填
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
// 2、构造 Kafka Producer 对象
Producer<String, String> producer = new KafkaProducer<>(props);
//3、 构造 ProducerRecord 对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-topic ", "我是第一条数据");
//4、 发送消息
producer.send(producerRecord);
// 5、关闭连接
producer.close();
}
}
1、构造 Properties 对象
bootstrap.servers
:必填的属性,该参数指定了一组host:port
对,用于创建向 Kafka broker 服务器的连接,比如:kl:9092,k2:9092,k3:9092
。上面的代码清单中指定了localhost:9092
, producer 使用时需要替换成实际的broker 列表。
key.serializer
:key的序列化类,也可以在第二步,构造KafkaProducer对象时候,通过构造方法传入
value.serializer
:value的序列化类,也可以在第二步,构造KafkaProducer对象时候,通过构造方法传入
其他参数,详细参看《kafka–producer参数详解》
参数的名字,可以对照org.apache.kafka.clients.producer.ProducerConfig.class
里的静态属性
2、构造 Kafka Producer 对象
3、 构造 ProducerRecord 对象
4、发送消息
4.1、异步发送
实际上所有的写入操作默认都是异步的。 Java 版本 producer 的send 方法会返回一个 Java Future 对象供用户稍后获取发送结果,
这就是所谓的回调机制。 send 方法提供了回调类参数来实现异步发送以及对发送结果的响应,具体代码如下:
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 消息发送成功
} else {
// 执行错误处理逻辑
}
}
});
4.2、同步发送
同步发送和异步发送其实就是通过 Java Future 来区分的,调用Future .get()
无限等待结果返回,
即实现同步发送的效果,具体代码如下:
producer.send(producerRecord).get();
使用 Futur.get 会一直等待下去直至 Kafka broker 发送结果返回给 producer 程序。
当结果从 broker 处返回时 get 方法要么返回发送结果,要么抛出异常交由 producer 自行处理。
如果没有错误, get 将返回对应的 RecordMetadata 实例(包含了己发送消息的所有元数据信息),
包括topic 、分区以及该消息在对应分区的位移信息。
4.3、异常信息
不管是同步发送还是异步发送,发送都有可能失败,导致返回异常错误。
当前 Kafka 的错误类型包含了两类:可重试异常和不可重试异常。
1、常见的可重试异常如下:
LeaderNotAvailableException
:分区的 leader 副本不可用,这通常出现在 leader 换届选举期间,重试之后可以自行恢复。
NotControllerException
: controller不可用, 这通常表明 controller在经历新一轮的选举,这也是可以通过重试机制自行恢复的。
NetworkException
:网络瞬时故障导致的异常,可重试.。
对于这种可重试的异常,如果在 producer 程序中配置了重试次数,
那么只要在规定的重试次数内自行恢复了,便不会出现在 onCompletion的exception中。
不过若超过了重试次数仍没有成功,则仍然会被封装进 exception 中。此时就需要 producer 程序自行处理这种异常。
所有可重试异常都继承自 org.apache.kafka .cornmon errors RetriableException抽象类;
2、不可重试异常:
RecordTooLargeException
:发送的消息尺寸过大,超过了规定的大小上限;
SerializationException
:序列化失败异常;
KafkaException
:其他类型的异常;
所有这些不可重试异常一旦被捕获都会被封装进 Future 的计算结果井返回给 producer 程序。
代码解决方式:
// 4、 发送消息
producer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
// 正常的业务
} else {
if (exception instanceof RetriableException) {
// 处理可重试瞬时异常
} else {
// 处理不可重试异常
}
}
});
5、关闭producer
producer 程序结束时一定要关闭 producer !
毕竟 producer 程序运行时占用了系统资源,因此必须要显式地调用KafkaProducer.close
方法关闭 producer 。
如果是调用无参数 close 方法, producer 会先处理完后再关闭,即所谓的“优雅”关闭退出 ;
同时, KafkaProducer 还提供了 个带超时参数的 close 方法 close(timeout);
如果调用此方法, producer 会等待 timeout 时间来完成所有处理中的请求,然后强行退出。
这就是说,若 timeout 超时,则 producer 会强制结束,并立即丢弃所有未发送以及未应答的发送请求。
因此在实际场景中一定要谨慎使用带超时的 close 方法。
更多推荐
所有评论(0)