消息的发送过程 

 

1. 创建 ProducerRecord 对象(可以指定键或分区)。

2. 键和值对象序列化成字节数组。

3. 确定分区。如果指定了分区,以指定的优先。如果没有,分区器会根据键选择一个分区。

4. 数据记录添加到记录批次。同一个批次里的所有消息会被发送到相同的主题和分区上。由独立的线程负责把这些记录批次发送到相应的 broker上。

服务器会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

生产者三个核心配置

1. bootstrap.servers (发送到哪里)

2. key.serializer (键序列化器)

3. value.serializer (值序列化器)

Properties prop = new Properties();
prop.put("bootstrap.servers", "broker1:9092,broker2:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(prop);

三种发送模式

发送并忘记:不关心发送结果,完全凭借Kafka实现高可用性,可能会丢失消息。

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

try { 
    producer.send(record);
} catch (Exception e) { 
    e.printStackTrace();
}

ProducerRecord的三个构造参数是,topic,key,value 

这里直接发送,异常捕获,不关心结果。

同步发送:根据返回的Future,同步等待结果

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

try {
    producer.send(record).get();
} catch (Exception e) { 
    e.printStackTrace();
}

这里对返回的Future进行了get调用,阻塞等待结果。

KafkaProducer 一般会发生两类错误。其中一类是可重试错误。比如连接错误,“无主(no leader)”错误。KafkaProducer 可以被配置成自动重试,如果在多次重 试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常

异步发送:通过回调的方式获取发送的结果。

private class DemoProducerCallback implements Callback { 

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if (e != null) {
                e.printStackTrace();
           }
    } 
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");

producer.send(record, new DemoProducerCallback());

通过实现Callback接口的onCompletion方法来接受回调。在send方法中,给一个回调的实现。

几种特殊的配置

配置说明

acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。

我们知道为了实现高可用,Kafka的分区是建立副本的,指定这个参数也是为了高可用,防止消息丢失。

如果 acks=0,生产者不会等待任何来自服务器的响应。即不关心broker是否收到消息,没有吞吐量的限制。

如果 acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果等待服务器的响应会增加延迟。如果客户端使用回调,延迟问题就可以得到缓解。

如果 acks=all,当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,但它的延迟更高。

buffer.memory

生产者内存缓冲区的大小

compression.type

默认不压缩,该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。

retries

retries 参数的值决定了生产者可以重发消息的次数

batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)

linger.ms

生产者在发送批次之前等待更多消息加入批次的时间

client.id

服务器会用它来识别消息的来源

max.in.flight.requests.per.connection

生产者在收到服务器响应之前可以发送多少个消息

timeout.ms

timeout.ms 指定了 broker 等待同步副本返回消息确认的时间

request.timeout.msrequest.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间
metadata.fetch.timeout.msmetadata. fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间

max.block.ms

在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞 时间

max.request.size

用于控制生产者发送的请求大小(字节)

receive.buffer.bytes

TCP socket 接收数据包的缓冲区大小

send.buffer.bytes

TCP socket 发送数据包的缓冲区大小

Avro序列化器

 

1. 配置注册表的地址

Properties props = new Properties();
// brokers
props.put("bootstrap.servers", "localhost:9092"); 
// 键序列化
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");  
// 值序列化
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
// 注册表地址
props.put("schema.registry.url", schemaUrl); 

键值都使用了KafkaAvroSerializer序列化器

注册表的地址使用schema.registry.url进行配置

2. 发送消息

Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);

// 不断生成事件,直到有人按下Ctrl+C组合键 
while (true) {
    Customer customer = CustomerGenerator.getNext(); 
    System.out.println("Generated customer " + customer.toString()); 

    ProducerRecord<String, Customer> record = new ProducerRecord<>(“customerContacts”, customer.getId(), customer);
    producer.send(record);
}

3. 不使用Avro对象生成器

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", url);

// schema描述
String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", " +
        "\"name\": \"Customer\"," +
        "\"fields\": [" +
        "{\"name\": \"id\", \"type\": \"int\"}," + 
        "{\"name\": \"name\", \"type\": \"string\"}," + 
        "{\"name\": \"email\", \"type\": [\"null\",\"string\"], \"default\":\"null\" }" + 
        "]}";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

// 解析schema
Schema.Parser parser = new Schema.Parser(); 
Schema schema = parser.parse(schemaString);

for (int nCustomers = 0; nCustomers < customers; nCustomers++) { 
    String name = "exampleCustomer" + nCustomers;
    String email = "example" + nCustomers + "@example.com";

    // 核心在这里
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomers);
    customer.put("name", name);
    customer.put("email", email);

    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer); 
    producer.send(data);
} 

4. 不指定key的发送

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");

这样,key无需指定,由内部完成自动分区。

如果键值为 null,并且使用了默认的分区器,那么记录将使用轮询(Round Robin)算法发送到主题内各个可用的分区上。

如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列,然后根据散列值把消息映射到特定的分区上。

在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。在创建主题的时候就把分区规划好,不要增加新分区。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐