Kafka的生产者(二)
Kafka的历史变迁在 Kafka 的历史变迁中,一共有两个大版本的生产者客户端:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,;第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 语言编写的客户端,它弥补了旧版客户端中存在的诸多设计缺陷。虽然 Kafka 是用 Java/Scala 语言编写的,但这并不妨碍它对于多语言的支持,在 Kafka 官网中,...
Kafka的历史变迁
在 Kafka 的历史变迁中,一共有两个大版本的生产者客户端:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,;第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 语言编写的客户端,它弥补了旧版客户端中存在的诸多设计缺陷。虽然 Kafka 是用 Java/Scala 语言编写的,但这并不妨碍它对于多语言的支持,在 Kafka 官网中,“CLIENTS”的入口提供了一份多语言的支持列表,其中包括常用的 C/C++、Python、Go 等语言
Kafka 的系统架构
包括生产者发送消息到kafka,消费者从kafka中拉取消息,kakfa的broker的信息注册到zookeeper
kafka生产者客户端开发
在 Kafka 中,我们把产生消息的那一方称为生产者,生产者产生的消息是发送到 Kafka 应用程序发送过程如图:
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
主要流程包括:
1.创建一个ProducerRecord 消息对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。
2.在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。这个时候就需要经过生产者拦截器来进行操作
3.接下来发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组(序列化器完成),这样它们才能够在网络上传输。然后消息到达了分区器。
3.发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有key,则将以循环的方式分配一个分区。确定好分区后,生产者就知道向哪个主题和分区发送数据了。
此外,ProducerRecord 还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。
-
如果将主题配置为使用 CreateTime,则生产者记录中的时间戳将由 broker 使用。
-
如果将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由 broker 重写。
4.接下来这条消息被存放在一个记录批次里(消息累加器),这个批次里的所有消息会被发送到相同的主题和分区上(一个分区对应一个消息累加器)。然后由一个独立的发送线程负责把它们发到 Kafka Broker 上。
5.Kafka Broker 在收到消息时会返回一个响应,如果写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。
kafka生产者的使用
pom.xml中kafka的依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
生产者hello world代码
public class KafkaProducer {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig(){
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();//初始化配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(props);//创建生产者
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "Hello, Kafka!");//创建消息
try {
producer.send(record);//发送
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建生产者对象
首先需要创建一个KafkaProducer 生产者对象,它是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。KafkaProducer 中有多个构造方法,示例如下:
public KafkaProducer(Map<String, Object> configs) {
this(new ProducerConfig(configs), (Serializer)null, (Serializer)null, (Metadata)null, (KafkaClient)null);
}
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, (Metadata)null, (KafkaClient)null);
}
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties), (Serializer)null, (Serializer)null, (Metadata)null, (KafkaClient)null);
}
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer, (Metadata)null, (KafkaClient)null);
}
构造方法主要就是为了初始化kafka生产者并配置相应的参数,比如需要连接的 Kafka 集群地址等,在 Kafka 生产者客户端 KafkaProducer 中有3个参数是必填的。
- bootstrap.servers:用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
- key.serializer 和 value.serializer:broker 端接收的消息必须是字节数组(byte[])。key.serializer 和 value.serializer 这两个参数分别用来指定 key 和 value 序列化操作的序列化器。(注意必须填写序列化器的全限定名,两个参数无默认值),上面的生产者使用的 KafkaProducer<String, String>和 ProducerRecord<String, String> 中的泛型 <String, String> 对应的是消息中 key 和 value 的类型,不过在发往 broker 之前会将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组byte[]。
key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化为字节数组,value.serializer 指定的类会将值序列化。
Serializer 类
Serializer 是一个接口,它表示类将会采用何种方式序列化,它的作用是把对象转换为字节,实现了 Serializer 接口的类主要有 ByteArraySerializer、StringSerializer、IntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默认使用的序列化器,其他的序列化器还有很多,你可以通过 这里 查看其他序列化器。要注意的一点:key.serializer 是必须要设置的,即使你打算只发送值的内容。
可选参数的设置
在上面的 initConfig() 方法里还设置了一个参数 client.id,这个参数用来设定 KafkaProducer 对应的客户端id,默认值为“”。如果客户端不设置,则 KafkaProducer 会自动生成一个非空字符串,内容形式如“producer-1”、“producer-2”,即字符串“producer-”与数字的拼接。
此外,我们可以通过直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来看到其他的参数,每个参数在 ProducerConfig 类中都有对应的名称,直接通过类名.去调用,更不容易出错。
public class ProducerConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. <p>No attempt will be made to batch records larger than this size. <p>Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. <p>A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.";
public static final String ACKS_CONFIG = "acks";
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: <ul> <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <code>retries</code> configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";
public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for <code>max.block.ms</code> after which it will throw an exception.<p>This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).";
public static final String RETRIES_CONFIG = "retries";
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting <code>max.in.flight.requests.per.connection</code> to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
private static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.";
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires <code>max.in.flight.requests.per.connection</code> to be less than or equal to 5, <code>retries</code> to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.";
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.";
public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
public static final String TRANSACTIONAL_ID_DOC = "The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is <code>null</code>, which means transactions cannot be used. Note that transactions requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`.";
创建消息
消息对象: ProducerRecord,它包含了多个属性,与业务相关的消息体只是其中的一个 value 属性。ProducerRecord 类的定义如下(只截取成员变量):
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp; //消息的时间戳
//省略其他成员方法和构造方法
}
其中 :
- topic 字段代表消息要发往的主题(必填)
- partition 字段代表消息要发往的分区号
- headers 字段是消息的头部,Kafka 0.11.x 版本引入,用来设定一些与应用相关的信息,如无需要也可以不用设置。
- key 是用来指定消息的键,不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个 key 可以让消息再进行二次归类,同一个 key 的消息会被划分到同一个分区中,此外有 key 的消息还可以支持日志压缩的功能。
- value 是指消息体,一般不为空,如果为空则表示特定的消息—墓碑消息(必填)
- timestamp 是指消息的时间戳,它有 CreateTime 和 LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。
构造方法
在ProducerRecord 的属性结构中 topic 属性和 value 属性是必填项,其余属性是选填项,对应的 ProducerRecord 的构造方法如下:
public ProducerRecord(String topic, Integer partition, Long timestamp,
K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp,
K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value,
Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
上面的代码中发送消息使用的是最后一种构造方法,也是最简单的一种,这种方式相当于将 ProducerRecord 中除 topic 和 value 外的属性全部值设置为 null。在实际的应用中,还会用到其他构造方法,针对不同的消息,需要构建不同的 ProducerRecord 对象。
发送消息
消息发送主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)
发送消息send()方法的API
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
public Future<RecordMetadata> send(ProducerRecord<K, V> record,
Callback callback)
发后即忘:只管往 Kafka 中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。上面的hello world就是采用这种模式
同步模式:KafkaProducer 的 send()方法并非是 void 类型,而是 Future类型,send()方法有2个重载方法,具体定义如下:
要实现同步的发送方式,可以利用返回的 Future 对象的阻塞等待 Kafka 的响应即可实现,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。如下:
try {
producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
send() 方法返回的 Future 对象可以获得发送的结果,返回一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" +
metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
同步发送消息都有个问题,那就是同一时间只能有一个消息在发送,这会造成许多消息无法直接发送,造成消息滞后,无法发挥效益最大化。
比如消息在应用程序和 Kafka 集群之间一个来回需要 10ms。如果发送完每个消息后都等待响应的话,那么发送100个消息需要 1 秒,但是如果是异步方式的话,发送 100 条消息所需要的时间就会少很多很多。大多数时候,虽然Kafka 会返回 RecordMetadata 消息,但是我们并不需要等待响应。
异步模式:为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。一般是在 send() 方法里指定一个 Callback 的回调函数,Kafka 在返回响应时调用该函数来实现异步的发送确认。
Future 本身就可以用作异步的逻辑处理。只是 Future 里的 get() 方法在何时调用,以及怎么调用都是需要面对的问题,使用 Callback 的方式非常简洁明了,Kafka 有响应时就会回调,要么发送成功,要么抛出异常。异步发送方式的示例如下:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" +
metadata.partition() + ":" + metadata.offset());
}
}
});
首先实现回调需要定义一个实现了org.apache.kafka.clients.producer.Callback的类,这个接口只有一个 onCompletion方法。如果 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常,在实际应用中应该使用更加稳妥的方式来处理,比如可以将异常记录以便日后分析,也可以做一定的处理来进行消息重发。onCompletion() 方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而 exception 为 null;消息发送异常时,metadata 为 null 而 exception 不为 null。
对于同一个分区而言,如果消息 record1 于 record2 之前先发送,那么 KafkaProducer 就可以保证对应的 callback1 在 callback2 之前调用,也就是说,回调函数的调用也可以保证分区有序。
producer.send(record1, callback1);
producer.send(record2, callback2);
关闭资源:通常,一个 KafkaProducer 是发送多条消息,在发送完这些消息之后,需要调用 KafkaProducer 的 close() 方法来回收资源。下面的示例中发送了100条消息,之后就调用了 close() 方法来回收所占用的资源
close() 方法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。与此同时,KafkaProducer 还提供了一个带超时时间的 close() 方法,具体定义如下:
public void close(long timeout, TimeUnit timeUnit)
如果调用了带超时时间 timeout 的 close() 方法,那么只会在等待 timeout 时间内来完成所有尚未完成的请求处理,然后强行退出。在实际应用中,一般使用的都是无参的 close() 方法
消息发送异常处理
KafkaProducer 中一般会发生两种类型的异常:可重试的异常和不可重试的异常。
常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如 NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如 LeaderNotAvailableException 表示分区的 leader 副本不可用,这个异常通常发生在 leader 副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。
不可重试的异常,如前面提及的 RecordTooLargeException 异常,暗示了所发送的消息太大,KafkaProducer 对此不会进行任何重试,直接抛出异常。
对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。如果重试了10次之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。retries 参数的默认值为0,配置方式参考如下:
props.put(ProducerConfig.RETRIES_CONFIG, 10);
消息发送后的流转
消息通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
生产者拦截器
拦截器(Interceptor)是早在 Kafka 0.10.0.0 中就已经引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。下面主要讲述生产者拦截器的相关内容
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3个方法:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
消息在通过 send() 方法发往 broker 的过程中,首先会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息,(比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩(Log Compaction)的功能)。
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的I/O线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
ProducerInterceptor 接口还有一个同样的父接口 Configurable(后面分析)
自定义生产者拦截器
自定义一个ProducerInterceptorPrefix ,通过 onSend() 方法来为每条消息添加一个前缀“prefix1-”,并且通过 onAcknowledgement() 方法来计算发送消息的成功率。如下:
//代码清单4-5生产者拦截器示例
public class ProducerInterceptorPrefix implements
ProducerInterceptor<String,String>{
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(
ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(),
record.key(), modifiedValue, record.headers());
}
@Override
public void onAcknowledgement(
RecordMetadata recordMetadata,
Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure ++;
}
}
@Override
public void close() {
double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率="
+ String.format("%f", successRatio * 100) + "%");
}
@Override
public void configure(Map<String, ?> map) {}
}
实现自定义的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置参数 interceptor.classes 中指定这个拦截器,此参数的默认值为“”。示例如下:
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptorPrefix.class.getName());
然后使用指定了 ProducerInterceptorPrefix 的生产者连续发送10条内容为“kafka”的消息,在发送完之后客户端打印出如下信息:
[INFO] 发送成功率=100.000000%
如果消费这10条消息,会发现消费了的消息都变成了“prefix1-kafka”,而不是原来的“kafka”。
KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。如果拦截链中的某个拦截器的执行需要依赖于前一个拦截器的输出,那么就有可能产生“副作用”。设想一下,如果前一个拦截器由于异常而执行失败,那么这个拦截器也就跟着无法继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。
消息的序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。收到消息后,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。生产者使用的序列化器和消费者使用的反序列化器是需要相互对应的,用什么序列化就应该用什么反序列化
序列化接口
如消息的 key 和 value 都使用了字符串,对应的序列化器也使用了自带org.apache.kafka.common.serialization.StringSerializer除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了现org.apache.kafka.common.serialization.Serializer 接口,此接口有3个方法:
public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()
- configure() 方法用来配置当前类
- serialize() 方法用来执行序列化操作
- close() 方法用来关闭当前的序列化器,一般情况下 close() 是一个空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被 KafkaProducer 调用多次
自定义序列化器
如果 Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。下面就以一个简单的例子来介绍自定义类型的使用方法。
假设我们要发送的消息都是 Company 对象,这个 Company 的定义很简单,只有名称 name 和地址 address, Company 对应的序列化器 CompanySerializer,示例代码如下
//代码清单4-2 自定义的序列化器CompanySerializer
public class CompanySerializer implements Serializer<Company> {
@Override
public void configure(Map configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.
allocate(4+4+name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {}
}
如何使用自定义的序列化器
使用自定义的序列化器 CompanySerializer 只需将 KafkaProducer 的 value.serializer 参数设置为 CompanySerializer 类的全限定名即可。假如我们要发送一个 Company 对象到 Kafka
//代码清单4-3 自定义序列化器使用示例
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
CompanySerializer.class.getName());
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, Company> producer =
new KafkaProducer<>(properties);
Company company = Company.builder().name("hiddenkafka")
.address("China").build();
ProducerRecord<String, Company> record =
new ProducerRecord<>(topic, company);
producer.send(record).get();
注意,示例中消息的 key 对应的序列化器还是 StringSerializer
分区器
消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,分区器是根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了 org.apache.kafka.clients.producer.Partitioner 接口,这个接口中定义了2个方法,具体如下所示。
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster);
public void close();
- partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。
- close() 方法在关闭分区器的时候用来回收一些资源。
Partitioner 接口和生产者拦截器ProducerInterceptor 一样,也有一个父接口 org.apache.kafka.common.Configurable,这个接口中只有一个方法:Configurable 接口中的 configure() 方法主要用来获取配置信息及初始化数据。
void configure(Map<String, ?> configs);
分区策略
如果 key 为 null,那么消息将默认会以顺序轮询的方式发往主题内的各个可用分区。当然也可以实现随机轮询,实现随机分配的代码只需要两行,如下
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
按照 key 进行消息保存(同一个key发送到同一个分区)
在默认分区器 DefaultPartitioner 的实现中,close() 是空方法,而在 partition() 方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,所以拥有相同 key 的消息会被写入同一个分区。这个策略也叫做 key-ordering 策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示
注意:如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key 为 null 并且有可用分区时,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。
在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证 key 与分区之间的映射关系了。
自定义分区器
同 DefaultPartitioner 一样实现 Partitioner 接口即可。默认的分区器在 key 为 null 时不会选择非可用的分区,我们可以通过自定义的分区器 DemoPartitioner 来打破这一限制,如下:
//代码清单4-4 自定义分区器实现
public class DemoPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
}else
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
实现自定义的 DemoPartitioner 类之后,同样需要通过配置参数 partitioner.class 来显式指定这个分区器。示例如下:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
DemoPartitioner.class.getName());
这个自定义分区器的实现比较简单,也可以根据自身业务的需求来灵活实现分配分区的计算方式,比如一般大型电商都有多个仓库,可以将仓库的名称或 ID 作为 key 来灵活地记录商品信息。
生产者压缩机制
压缩一词简单来讲就是一种互换思想,它是一种经典的用 CPU 时间去换磁盘空间或者 I/O 传输量的思想,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。
Kafka 压缩是什么
Kafka 的消息分为两层:消息集合 和 消息。一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
在 Kafka 中,压缩会发生在两个地方:Kafka Producer 和 Kafka Consumer,为什么启用压缩?说白了就是消息太大,需要变小一点 来使消息发的更快一些。
Kafka Producer 中使用 compression.type 来开启压缩
private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");
上面代码表明该 Producer 的压缩算法使用的是 GZIP
有压缩必有解压缩,Producer 使用压缩算法压缩消息后并发送给服务器后,由 Consumer 消费者进行解压缩,因为采用的何种压缩算法是随着 key、value 一起发送过去的,所以消费者知道采用何种压缩算法。
整体架构
主线程和Sender线程
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
消息收集器RecordAccumulator的构造及相关参数
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。(当buffer满了或者metadata获取不到(比如leader挂了),或者序列化没完成分区函数没计算完等等情况下的最大阻塞时间,默认60000ms (60秒))
主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。使用ProducerBatch 可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。(当消息发送速度大于kafka服务器接收的速度,producer会阻塞max_block_ms,超时会报异常,buffer_memory用来保存等待发送的消息,默认33554432(32MB))
ByteBuffer
消息在网络上都是以字节Byte形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放内存是比较耗费资源的,所以在RecordAccumulator 的内部定义了一个 BufferPool,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size 参数来指定,默认值为16384B,即16KB。我们可以适当地调大 batch.size 参数以便多缓存一些消息。
ProducerBatch和batch.size 的关系
ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord)流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列Deque(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。
消息的转换
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区, Deque< ProducerBatch>> 的保存形式转变成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成 <Node, List> 的形式之后,Sender 还会进一步封装成 <Node, Request> 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。
请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId, Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests. per. connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据的更新
上面提及的 InFlightRequests 还可以获得 leastLoadedNode,即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于下图中的 InFlightRequests 来说,图中展示了三个节点 Node0、Node1和Node2,很明显 Node1 的负载最小。也就是说,Node1 为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
元数据
hello world的发送消息的方法中,我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息。
元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上,哪些副本在 AR、ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过 metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。客户端参数 metadata.max.age.ms 的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode,然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的,在创建完 MetadataRequest 之后同样会存入 InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由 Sender 线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过 synchronized 和 final 关键字来保障。
生产者相关参数详解
1. acks
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks 参数有3种类型的值(都是字符串类型)。
- acks = 1。默认值即为1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为1,是消息可靠性和吞吐量之间的折中方案。
- acks = 0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为0可以达到最大的吞吐量。
- acks = -1 或 acks = all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为 -1(all) 可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有 leader 副本,这样就退化成了 acks=1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动,消息可靠性分析的具体内容可以参考《图解Kafka之核心原理》。
注意 acks 参数配置的值是一个字符串类型,而不是整数类型。举个例子,将 acks 参数设置为0,需要采用下面这两种形式:
properties.put("acks", "0");
# 或者
properties.put(ProducerConfig.ACKS_CONFIG, "0");
2. max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。
不建议读者盲目地增大这个参数的配置值。因为这个参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的异常。比如将 broker 端的 message.max.bytes 参数配置为10,而 max.request.size 参数配置为20,那么当我们发送一条大小为15B的消息时,生产者客户端就会报出如下的异常:
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
3. retries和retry.backoff.ms
retries 参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。
重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。
对于某些应用来说,顺序性非常重要,比如 MySQL 的 binlog 传输,如果出现错误就会造成非常严重的后果。如果将acks参数配置为非零值,并且 max.in.flight.requests.per.connection 参数配置为大于1的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为1,而不是把 acks 配置为0,不过这样也会影响整体的吞吐。
4. compression.type
这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。下面是各压缩算法的对比
5. connections.max.idle.ms
这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。
6. linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。
7. receive.buffer.bytes
这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。
8. send.buffer.bytes
这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。
9. request.timeout.ms
这个参数用来配置 Producer 等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
其他已经介绍的参数总结:
参 数 名 称 | 默 认 值 | 参 数 释 义 |
---|---|---|
bootstrap.servers | “” | 指定连接 Kafka 集群所需的 broker 地址清单 |
key.serializer | “” | 消息中 key 对应的序列化类,需要实现 org.apache.kafka.common.serialization.Serializer 接口 |
value.serializer | “” | 消息中 value 对应的序列化类,需要实现 org.apache.kafka.common.serialization.Serializer 接口 |
buffer.memory | 33554432(32MB) | 生产者客户端中用于缓存消息的缓冲区大小 |
batch.size | 16384(16KB) | 用于指定 ProducerBatch 可以复用内存区域的大小 |
client.id | “” | 用来设定 KafkaProducer 对应的客户端id |
max.block.ms | 60000 | 用来控制 KafkaProducer 中 send() 方法和 partitionsFor() 方法的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞 |
partitioner.class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | 用来指定分区器,需要实现 org.apache.kafka. clients.producer.Partitioner 接口 |
enable.idempotence | false | 是否开启幂等性功能 |
interceptor.classes | “” | 用来设定生产者拦截器,需要实现 org.apache. kafka.clients.producer. ProducerInterceptor 接口。 |
max.in.flight.requests.per.connection | 5 | 限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数 |
metadata.max.age.ms | 300000(5分钟) | 如果在这个时间内元数据没有更新的话会被强制更新 |
transactional.id | null | 设置事务id,必须唯一 |
总结:
到目前为止主要讲述了生产者客户端的具体用法及其整体架构,主要内容包括配置参数的详解、消息的发送方式、序列化器、分区器、拦截器等。在实际应用中,一套封装良好的且灵活易用的客户端可以避免开发人员重复劳动,也提高了开发效率,还可以提高程序的健壮性和可靠性,而 Kafka 的客户端正好包含了这些特质。对于 KafkaProducer 而言,它是线程安全的,我们可以在多线程的环境中复用它,后面的消费者客户端 KafkaConsumer 而言,它是非线程安全的,因为它具备了状态。
更多推荐
所有评论(0)