主要讲两个重要点:1、分区选择   2、ack机制   ,3、重试机制





    bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: -1


package kafka;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

@RequestMapping(value = "kafkaProducer")
public class KafkaProducerController {

    private KafkaTemplate<String ,Object> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    @GetMapping(value = "/sendMessage")
    public int sendMessage(){

        Message message = new Message();
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));

        return  1;



 // 不执定分区,key
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
    return this.doSend(producerRecord);
 // 指定key
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
    return this.doSend(producerRecord);
 // 指定分区和key
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
    return this.doSend(producerRecord);


	protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
		final Producer<K, V> producer = getTheProducer(producerRecord.topic());
		this.logger.trace(() -> "Sending: " + producerRecord);
		final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
		Object sample = null;
		if (this.micrometerEnabled && this.micrometerHolder == null) {
			this.micrometerHolder = obtainMicrometerHolder();
		if (this.micrometerHolder != null) {
			sample = this.micrometerHolder.start();
		Future<RecordMetadata> sendFuture =
				producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
		// May be an immediate failure
		if (sendFuture.isDone()) {
			try {
			catch (InterruptedException e) {
				throw new KafkaException("Interrupted", e);
			catch (ExecutionException e) {
				throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
		if (this.autoFlush) {
		this.logger.trace(() -> "Sent: " + producerRecord);
		return future;


Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));


接下来就是kafka api中的代码了

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
   private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            int partition = partition(record, serializedKey, serializedValue, cluster);


1、serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

     keySerializer 是在配置文件中配置的序列化类,对key进行序列化成byte[]

2、int partition = partition(record, serializedKey, serializedValue, cluster);

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

 逻辑是如果send的时候如果指定了分区直接往指定的分区里面发送消息  如果没有指定分区就执行

partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);


    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

看这里就说明,如果send时候没有传key参数,则走stickyPartitionCache.partition(topic, cluster);大致是一种轮询的机制,对分区进行轮询








    public static final String ACKS_CONFIG = "acks";
    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
                                           + " durability of records that are sent. The following settings are allowed: "
                                           + " <ul>"
                                           + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
                                           + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
                                           + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
                                           + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
                                           + " always be set to <code>-1</code>."
                                           + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
                                           + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
                                           + " acknowledging the record but before the followers have replicated it then the record will be lost."
                                           + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                           + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                           + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
                                           + "</ul>";


acks = 0 时,只要发送了这条信息,就一律认为成功了,然后发送第二条



acks = 1 (默认配置)时,只要发送给leader分区的数据落磁盘了,就认为成功了,然后发送第二条,

   缺点:如果leader分区所在服务器宕机了,还没把数据同步到follower分区上去,之后选举出来                  的leader分区没有之前发送的数据,造成数据丢失


acks = all 时,只要发送给leader分区的数据落磁盘了,而且把数据同步给follower分区了,然后发送第二条

   缺点:性能差(acks = 1的性能是acks = all的10倍)





    /** <code>retries</code> */
    public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
    private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
            + " Note that this retry is no different than if the client resent the record upon receiving the error."
            + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
            + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
            + " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be"
            + " failed before the number of retries has been exhausted if the timeout configured by"
            + " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally"
            + " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control"
            + " retry behavior.";

1. 重试次数少于参数retries指定的值;
2. 异常是RetriableException类型或者TransactionManager允许重试;


