生产者消息对象

public class ProducerRecord<K, V> {
	private final String topic; // 主题
	private final Integer partition; //分区号
	private final Headers headers; //消息头部
	private final K key; //键
	private final V value; //值
	private final Long timestamp; //消息的时间戳
}

其中key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号,进而让消息发往特定的分区,一般同一个key的消息会被划分到同一个分区中。
value代表消息的实际内容。
timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

创建生产者实例

public static Properties initConfig() {
	Properties props = new Properties();
	props.put(ProducerConfig.KEY_SERIALZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	props.put(ProducerConfig.VALUE_SERIALZER_CLASS_CONFIG,
	StringSerializer.class.getName());
}

KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

消息的发送

创建生产者实例

创建生产者实例的方法有很多种,其中最简单的是下面的构造方于除了topic和value外的属性,其他都置为null。

public ProducerRecord(String topic, V value);

发送消息主要有三种模式:发完即忘(fire-and-forget),同步(sync)及异步(async)。

KafkaProducer的sand()方法返回值并非是void类型,而是Future类型,send()方法有两个重载方法,具体定义如下:

public Future<RecordMetadata> send(ProducerRecord<K,V> record);
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
  • 发完即忘
    它只管往Kafka中发送消息而并不关心消息是否正确到达。
    在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时),会造成消息的丢失。这种发送方式性能最高,但可靠性也最差。

  • 同步发送

try {
	producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
	e.printStackTrace();
}

通过feature对象中的get()方法,来阻塞等待kafka的响应,直到发送成功,或者发生异常。

同步发送的可靠性高,但性能会差很多,因为需要阻塞等到一条消息发送完之后,才能发送下一条。

  • 异步发送
producer.send(record, new Callback()) {
	@override
	public void onCompletion(RecordMetadata metadata, Excetion exception){
		if (excetion != null) {
			exception.printStackTrace();
		} else {
			...
		}
	}
}

当Kafka有响应时候,就会有回调,要么发送成功,要么抛出异常。

序列化器

生产者需要用序列化器把对象转换成字节数组,才能通过网络发送给Kafka。而消费者需要用反序列器把从Kafka中收到的字节数组转换成相应的对象。

分区器

分区器的作用是为消息分配分区。

消息经过序列化后,就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器,因为patition代表的就是要发往的分区号。如果没有指定partition,则需要依赖分区器,根据key字段来计算partition的值。

拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前,做一些定制化的需求,比如统计类工作。

原理分析

整体架构

在这里插入图片描述
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后,缓存到消息收集器中(RecordAccumulator)。Sender现成负责从消息收集器中获取消息,并将其发送到kafka中。

RecordAccumulator
该收集器主要用来缓存消息,以便Sender线程可以批量发送,进而减少网络传输的资源消耗,以提高性能。

RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容为ProducerBatch,即Deque。消息写入缓存时候,追加到双端队列的尾部,读取消息时,从双端队列的头部读取。

Sender
Seender从RecordAccumulator中获取缓存的消息后,会进一步将原来<分区,Deque>的保存形式转变为<Node, List>的形式,其中Node表示集群的broken节点。

对于网络连接来说,生产者客户端是与具体的broken节点建立的连接,就是向具体的broken节点发送消息,而不关心消息属于哪一个分区;而对于KafkaProducer
的应用逻辑而言,我们只关注向哪个分区中发送消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成<Node, List>后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request发往各个Node了。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequest中,InFlightRequest存对象的具体形式为Map<NodeId, Deque>,它的主要作用是缓存了已经发送出去但还是没有收到响应的请求。

元数据的更新

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上follower副本又分配在哪些节点上等等信息。

假设我们通过如下的方式创建了一条消息ProducerRecord,

ProducerRecord<String, String> record = new ProducerRecord<>(topic, “xxx”);

这里的发送指令,我们只知道主题名称,和需要发送的内容,对其他信息却一无所知。例如要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后计算出目标分区,还需要知道leader副本所在的broken节点的地址、端口等信息才能建立链接,这些都属于元信息。

元数据的更新是在客户端进行的,对客户端的外部使用者不可见。更新操作是由Sender线程发起的,主线程也需要读取这些信息,这里的数据同步,是通过Synchronized和final关键字来保障。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐