Kafka(二):Kafka生产者Producer
消息发送发消息主要有三种模式:发后即忘,同步,异步。发后即忘这种方式只管往Kafka中发送消息,并不关心消息是否正确到达,实例入戏:try {producer.send(record);} catch (Exception e) {e.printStackTrace();}同步实现同步形式,可以利用返回的Future对象实现:...
消息发送
发消息主要有三种模式:发后即忘,同步,异步。
发后即忘
这种方式只管往Kafka中发送消息,并不关心消息是否正确到达,实例入戏:
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
同步
实现同步形式,可以利用返回的Future
对象实现:
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
} catch (Exception e) {
e.printStackTrace();
}
异步
kafka的send()
的异步方法需要指定一个Callback
回调函数,异步方法实例如下:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
}
}
});
生产者实例代码 (发送即忘)
package org.lpl.kafkademo.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 生产者
*
* @author lpl
* @version 1.0
* @date 2019/7/21
**/
public class Producer {
public static final String brokerList = "127.0.0.1:9092";
public static final String topic = "topic-1";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello word");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
拦截器
Kafka拦截器分为生产者拦截器和消费者拦截器,这里说一下生产者拦截器:
生产者拦截器可以在消息发送前做一些准备工作(按照某个规则过滤不符合要求的消息、修改消息内容等),使用生产者拦截器需要实现
org.apache.kafka.clients.producer.ProducerInterceptor
接口的三个方法
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
序列化器
生产者需要用序列化器把对象转化成字节数组才能通过网络发送给Kafka。对于消费之需要用反序列化器把从Kafka中接收到的字节数组转化为相应的对象。客户端自带的序列化器有StringSerializer
用于String类型的序列化器:
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
/**
* 确定编码类型
*/
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
同时还有ByteArray
、Bytes
、Double
、Long
、Integer
这几种类型,他们都实现了org.apache.kafka.common.serialization
接口;如果这些都不能满足你的需要可以选择使用Avro、JSON、Thrift、ProtoBuf等通用序列化工具实现。如果还不能满足我们的需求,我们可以通过实现org.apache.kafka.common.serialization
自己定义序列化器。
需要注意的是生产者和消费者使用相应的序列化器。
分区器
消息经过序列化器后就需要确定他发往的分区,如果ProducerRecord
制定了partition
字段,那么就不需要分区器,如果没有指定就需要分区器获取分区号。
Kafka中提供了默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它的源码如下:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
从源码来看partition()方法
定义了分区分配逻辑:如果key
不为null,则会对key进行哈希(采用MurmurHash2)算法,来计算分区号,如果key
为null,则消息则会以轮询的方式发到主题内的各个分区;
原理分析
整体架构
整个生产者客户端由两个线程(主线程和Sender线程)协调运行.在主线程由KafkaProducer
创建消息,然后通过拦截器、序列化器、和分区器的作用之后缓存到消息累加器RecordAccoumulator
中,Sender
线程负责从RecordAccoumulator
中获取消息并将其发送发到Kafka中。
发送消息流程
- KafkaProducer发送消息到拦截器,在拦截器可以按照某个规则过滤不符合要求的消息、修改消息内容等。
- 经过序列化器,对消息进行序列化
- 消息经过序列化后就需要确定发往的分区,如果消息
ProducerRecord
中指定了partition
字段,那么就不会经过分区器,如果没有指定,需要在分区器为消息分配分区; - 消息进入
RecordAccumudator
进行缓存,以便消息可以批量进行发送 Sender
线程从RecordAccumudator
中获取缓存消息- 将原本的
<分区,Dequeue<ProducerBatch>>
的保存形式转变为<Node,List<ProducerBatch>>
,然后进一步分装成<Node,Request>
的形式,这样就可以发往各个Node了。 - 在发往Kafka之前还会保存到
InFlightRequests
,InFlightRequests
保存对象的形式是Map<NodeId,Dequeue>,主要作用是缓存了已经发出去但还有收到响应的请求。 - 提交给
Selector
进行消息发送 Selector
将消息发送到kafkaSelector
消息发送完成响应给InFlightRequests
进行处理- 清理
RecordAccumudator
缓存
RecordAccoumulator
那么RecordAccoumulator
是一个什么东西呢?
我们看一下源码:
/**
* This class acts as a queue that accumulates records into {@link MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
*/
public final class RecordAccumulator {
private final Logger log;
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
private final long deliveryTimeoutMs;
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Map<TopicPartition, Long> muted;
private int drainIndex;
private final TransactionManager transactionManager;
private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
RecordAccoumulator
主要用来缓存消息以便Sender
线程可以批量发送,进而减少网络传输的资源消耗以提升性能。主线程发过来的消息都会被追加到RecordAccoumulator
的某个双端队列Deque<ProducerBatch>
中,RecordAccoumulator
为每个分区都维护了一个双端队列Deque<ProducerBatch>
,消息写入时,追加到双端队列的尾部;Sender
消费消息时,从双端队列的头部读取。需要注意的是ProducerBatch
是一个批次的ProducerRecord
。
消息在网络上都是以字节Byte
的形式传输,在发送之前需要创建一块内存区域来保存对应的消息,在Kafka客户端中,通过java.io.ByteBuffer
实现消息内存的创建和释放,为了防止频繁创建和释放的资源消耗,RecordAccoumulator
内部有一个BufferPool
来实现ByteBuffer
的复用。ByteBuffer
只是针对特定大小的ByteBuffer
,可以通过bacth.size
参数指定,默认值是16384B(16KB)
Sender
Sender
是处理向Kafka集群发送generate请求的后台线程。这个线程生成元数据请求更新其集群视图,然后将产生的请求发送到适当的节点。
Sender
从RecordAccumulator
中获取缓存消息之后,会将原本的<分区,Dequeue<ProducerBatch>>
的保存形式转变为<Node,List<ProducerBatch>>
形式,其中Node表示kafka集群的节点。在转化成<Node,List<ProducerBatch>>
之后,Sender
还会进一步封装成<Node,Request>
形式,这样就可以发往各个Node了。
重要的生产者参数
参数 | 默认值 | 配置方式 | 说明 |
---|---|---|---|
acks | 1 | properties.put(“acks”,“0”) | 该参数用来指定分区中要有多少副本收到这条消息,之后生产者才会认为这条消息事成功写入的ack=1 :生产者发送消息后,只要分区的leader副本成功写入消息,那么就会收到来自服务器的成功响应ack=0 :生产者发送消息后不需要等待任何服务器的响应ack=-1或ack=all :生产者发送消息后需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务器的成功响应 |
max.request.size | 1048576B(1M) | 限制生产者能发送消息的最大值。 配置该参数时需要注意broker端 message.max.byte 参数 | |
retries和retry.backoff.ms | 0 100 | retries参数用来配置生产者在发生异常时的重试次数 retry.backoff.ms是两次重试的时间间隔 | |
compression.type | 0 | 用来指定消息压缩方式,默认值为none ;对消息压缩可以极大的减少网络传输,降低网络I/O,但是消息压缩是一种使用时间换空间的优化方式 | |
connections.max.idle.ms | 540000(ms) | 关闭空闲链接时间 | |
linger.ms | 0 | 用来指定生产者发送producer 之前等待更多消息加入Producer 的时间 | |
receive.buffer.bytes | 32768B(32KB) | 用来设置Socket接受消息缓冲区(SO_RECBUF))的大小,如果设置为-1,则使用操作系统的默认值。 | |
send.buffer.bytes | 32768B(32KB) | 用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,如果设置为-1,则使用操作系统的默认值。 | |
request.timeout.ms | 3000ms | 用来配置Producer 等待请求响应的最长时间 |
参考
- 《深入理解kafka核心设计与实践原理》
- https://kafka.apache.org/documentation.html#producerapi
更多推荐
所有评论(0)