Kafka消息的生产
如果acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列,然后根据散列值把消息映射到特定的分区上。我们知道为了实现高可用,Kafka的分区是建立副本的,指定这个参数也是为了高可用,防
消息的发送过程
1. 创建 ProducerRecord 对象(可以指定键或分区)。
2. 键和值对象序列化成字节数组。
3. 确定分区。如果指定了分区,以指定的优先。如果没有,分区器会根据键选择一个分区。
4. 数据记录添加到记录批次。同一个批次里的所有消息会被发送到相同的主题和分区上。由独立的线程负责把这些记录批次发送到相应的 broker上。
服务器会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
生产者三个核心配置
1. bootstrap.servers (发送到哪里)
2. key.serializer (键序列化器)
3. value.serializer (值序列化器)
Properties prop = new Properties();
prop.put("bootstrap.servers", "broker1:9092,broker2:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(prop);
三种发送模式
发送并忘记:不关心发送结果,完全凭借Kafka实现高可用性,可能会丢失消息。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
ProducerRecord的三个构造参数是,topic,key,value
这里直接发送,异常捕获,不关心结果。
同步发送:根据返回的Future,同步等待结果
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
这里对返回的Future进行了get调用,阻塞等待结果。
KafkaProducer 一般会发生两类错误。其中一类是可重试错误。比如连接错误,“无主(no leader)”错误。KafkaProducer 可以被配置成自动重试,如果在多次重 试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常
异步发送:通过回调的方式获取发送的结果。
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
通过实现Callback接口的onCompletion方法来接受回调。在send方法中,给一个回调的实现。
几种特殊的配置
配置 | 说明 |
acks | acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 我们知道为了实现高可用,Kafka的分区是建立副本的,指定这个参数也是为了高可用,防止消息丢失。 如果 acks=0,生产者不会等待任何来自服务器的响应。即不关心broker是否收到消息,没有吞吐量的限制。 如果 acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果等待服务器的响应会增加延迟。如果客户端使用回调,延迟问题就可以得到缓解。 如果 acks=all,当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,但它的延迟更高。 |
buffer.memory | 生产者内存缓冲区的大小 |
compression.type | 默认不压缩,该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。 |
retries | retries 参数的值决定了生产者可以重发消息的次数 |
batch.size | 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数) |
linger.ms | 生产者在发送批次之前等待更多消息加入批次的时间 |
client.id | 服务器会用它来识别消息的来源 |
max.in.flight.requests.per.connection | 生产者在收到服务器响应之前可以发送多少个消息 |
timeout.ms | timeout.ms 指定了 broker 等待同步副本返回消息确认的时间 |
request.timeout.ms | request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间 |
metadata.fetch.timeout.ms | metadata. fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间 |
max.block.ms | 在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞 时间 |
max.request.size | 用于控制生产者发送的请求大小(字节) |
receive.buffer.bytes | TCP socket 接收数据包的缓冲区大小 |
send.buffer.bytes | TCP socket 发送数据包的缓冲区大小 |
Avro序列化器
1. 配置注册表的地址
Properties props = new Properties();
// brokers
props.put("bootstrap.servers", "localhost:9092");
// 键序列化
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 值序列化
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 注册表地址
props.put("schema.registry.url", schemaUrl);
键值都使用了KafkaAvroSerializer序列化器
注册表的地址使用schema.registry.url进行配置
2. 发送消息
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// 不断生成事件,直到有人按下Ctrl+C组合键
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(“customerContacts”, customer.getId(), customer);
producer.send(record);
}
3. 不使用Avro对象生成器
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", url);
// schema描述
String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string\"], \"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
// 解析schema
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example" + nCustomers + "@example.com";
// 核心在这里
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomers);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer);
producer.send(data);
}
4. 不指定key的发送
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");
这样,key无需指定,由内部完成自动分区。
如果键值为 null,并且使用了默认的分区器,那么记录将使用轮询(Round Robin)算法发送到主题内各个可用的分区上。
如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列,然后根据散列值把消息映射到特定的分区上。
在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。在创建主题的时候就把分区规划好,不要增加新分区。
更多推荐
所有评论(0)