java版的producer开发流程


Java 版本 producer 工作流程如图:

1.png

Java 版本 producer 工作流程

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerTest {

    public static void main(String[] args) throws Exception {
        // 1、构造 Properties 对象
        Properties props = new Properties();
        props.put("bootstrap.servers", " localhost:9092");// 必填
        props.put("key.serializer", "org.apache.kafka.common.serialization. StringSerializer");// 必填
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 必填
        props.put("acks", "-1");
        props.put("retries", 3);
        props.put("batch.size", 323840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("max.block.ms", 3000);
        // 2、构造 Kafka Producer 对象
        Producer<String, String> producer = new KafkaProducer<>(props);
        //3、 构造 ProducerRecord 对象
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-topic ", "我是第一条数据");
        //4、 发送消息
        producer.send(producerRecord);
        // 5、关闭连接
        producer.close();
    }
}

1、构造 Properties 对象

bootstrap.servers:必填的属性,该参数指定了一组host:port 对,用于创建向 Kafka broker 服务器的连接,比如:kl:9092,k2:9092,k3:9092。上面的代码清单中指定了localhost:9092, producer 使用时需要替换成实际的broker 列表。

key.serializer:key的序列化类,也可以在第二步,构造KafkaProducer对象时候,通过构造方法传入

value.serializer:value的序列化类,也可以在第二步,构造KafkaProducer对象时候,通过构造方法传入

其他参数,详细参看《kafka–producer参数详解》

参数的名字,可以对照org.apache.kafka.clients.producer.ProducerConfig.class里的静态属性

2、构造 Kafka Producer 对象

3、 构造 ProducerRecord 对象

4、发送消息

4.1、异步发送

实际上所有的写入操作默认都是异步的。 Java 版本 producer 的send 方法会返回一个 Java Future 对象供用户稍后获取发送结果,

这就是所谓的回调机制。 send 方法提供了回调类参数来实现异步发送以及对发送结果的响应,具体代码如下:

producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    // 消息发送成功
                } else {
                    // 执行错误处理逻辑
                }
            }
        });

4.2、同步发送

同步发送和异步发送其实就是通过 Java Future 来区分的,调用Future .get()无限等待结果返回,

即实现同步发送的效果,具体代码如下:

 producer.send(producerRecord).get();

使用 Futur.get 会一直等待下去直至 Kafka broker 发送结果返回给 producer 程序。

当结果从 broker 处返回时 get 方法要么返回发送结果,要么抛出异常交由 producer 自行处理。

如果没有错误, get 将返回对应的 RecordMetadata 实例(包含了己发送消息的所有元数据信息),

包括topic 、分区以及该消息在对应分区的位移信息。

4.3、异常信息

不管是同步发送还是异步发送,发送都有可能失败,导致返回异常错误。

当前 Kafka 的错误类型包含了两类:可重试异常和不可重试异常。

1、常见的可重试异常如下:

LeaderNotAvailableException :分区的 leader 副本不可用,这通常出现在 leader 换届选举期间,重试之后可以自行恢复。

NotControllerException: controller不可用, 这通常表明 controller在经历新一轮的选举,这也是可以通过重试机制自行恢复的。

NetworkException :网络瞬时故障导致的异常,可重试.。

对于这种可重试的异常,如果在 producer 程序中配置了重试次数,

那么只要在规定的重试次数内自行恢复了,便不会出现在 onCompletion的exception中。

不过若超过了重试次数仍没有成功,则仍然会被封装进 exception 中。此时就需要 producer 程序自行处理这种异常。

所有可重试异常都继承自 org.apache.kafka .cornmon errors RetriableException抽象类;

2、不可重试异常:

RecordTooLargeException :发送的消息尺寸过大,超过了规定的大小上限;

SerializationException:序列化失败异常;

KafkaException :其他类型的异常;

所有这些不可重试异常一旦被捕获都会被封装进 Future 的计算结果井返回给 producer 程序。

代码解决方式:

        // 4、 发送消息
        producer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                // 正常的业务
            } else {
                if (exception instanceof RetriableException) {
                    // 处理可重试瞬时异常
                } else {
                    // 处理不可重试异常
                }
            }
        });

5、关闭producer

producer 程序结束时一定要关闭 producer !

毕竟 producer 程序运行时占用了系统资源,因此必须要显式地调用KafkaProducer.close 方法关闭 producer 。

如果是调用无参数 close 方法, producer 会先处理完后再关闭,即所谓的“优雅”关闭退出 ;

同时, KafkaProducer 还提供了 个带超时参数的 close 方法 close(timeout);

如果调用此方法, producer 会等待 timeout 时间来完成所有处理中的请求,然后强行退出。

这就是说,若 timeout 超时,则 producer 会强制结束,并立即丢弃所有未发送以及未应答的发送请求。

因此在实际场景中一定要谨慎使用带超时的 close 方法。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐