主要讲两个重要点:1、分区选择   2、ack机制   ,3、重试机制

springboot中现在已经嵌入了kafka,可以直接用KafkaTemplate进行操作,和RestTemplate类似

maven中配置:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

application.yml配置:

  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: -1

分区选择

package kafka;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

@Slf4j
@RestController
@RequestMapping(value = "kafkaProducer")
public class KafkaProducerController {

    @Autowired
    private KafkaTemplate<String ,Object> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    @GetMapping(value = "/sendMessage")
    public int sendMessage(){

        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMessage(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("testTopic",gson.toJson(message));

        return  1;
    }
}

kafkaTemplate.send可就有往kafka集群里面push数据

看下kafkaTemplate中send的重载方法

 // 不执定分区,key
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
    return this.doSend(producerRecord);
}
 // 指定key
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
    return this.doSend(producerRecord);
}
 // 指定分区和key
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
    return this.doSend(producerRecord);
}

其都是把这些参数,封装成一个ProducerRecord对象

	protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
		final Producer<K, V> producer = getTheProducer(producerRecord.topic());
		this.logger.trace(() -> "Sending: " + producerRecord);
		final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
		Object sample = null;
		if (this.micrometerEnabled && this.micrometerHolder == null) {
			this.micrometerHolder = obtainMicrometerHolder();
		}
		if (this.micrometerHolder != null) {
			sample = this.micrometerHolder.start();
		}
		Future<RecordMetadata> sendFuture =
				producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
		// May be an immediate failure
		if (sendFuture.isDone()) {
			try {
				sendFuture.get();
			}
			catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new KafkaException("Interrupted", e);
			}
			catch (ExecutionException e) {
				throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
			}
		}
		if (this.autoFlush) {
			flush();
		}
		this.logger.trace(() -> "Sent: " + producerRecord);
		return future;
	}

主要看

Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));

 以上代码是springboot实现的

接下来就是kafka api中的代码了

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
   private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
          .....................................................
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            int partition = partition(record, serializedKey, serializedValue, cluster);
           ........................................................................
    }

主要看两行代码:

1、serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

     keySerializer 是在配置文件中配置的序列化类,对key进行序列化成byte[]

2、int partition = partition(record, serializedKey, serializedValue, cluster);

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

 逻辑是如果send的时候如果指定了分区直接往指定的分区里面发送消息  如果没有指定分区就执行

partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

 看这里有三种策略,当然你可以指定用那些策略,我们先来看看默认的策略

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

看这里就说明,如果send时候没有传key参数,则走stickyPartitionCache.partition(topic, cluster);大致是一种轮询的机制,对分区进行轮询

如果传入了key,则对key进行hash取模

则总结一下:

(1)指定了patition,则直接使用;

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition

ack机制

在源码ProducerConfig类中:

    public static final String ACKS_CONFIG = "acks";
    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
                                           + " durability of records that are sent. The following settings are allowed: "
                                           + " <ul>"
                                           + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
                                           + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
                                           + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
                                           + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
                                           + " always be set to <code>-1</code>."
                                           + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
                                           + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
                                           + " acknowledging the record but before the followers have replicated it then the record will be lost."
                                           + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                           + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                           + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
                                           + "</ul>";

大致的意思是

acks = 0 时,只要发送了这条信息,就一律认为成功了,然后发送第二条

    缺点:网络不好,或者leader分区所在服务器宕机了都会造成数据丢失

    优点:性能极佳

acks = 1 (默认配置)时,只要发送给leader分区的数据落磁盘了,就认为成功了,然后发送第二条,

   缺点:如果leader分区所在服务器宕机了,还没把数据同步到follower分区上去,之后选举出来                  的leader分区没有之前发送的数据,造成数据丢失

   优点:性能较好

acks = all 时,只要发送给leader分区的数据落磁盘了,而且把数据同步给follower分区了,然后发送第二条

   缺点:性能差(acks = 1的性能是acks = all的10倍)

   优点:能尽可能的保证数据不丢失

acks=all就代表数据一定不会丢失了吗?

当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?
当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。
所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。
这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。

重试机制

    /** <code>retries</code> */
    public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
    private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
            + " Note that this retry is no different than if the client resent the record upon receiving the error."
            + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
            + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
            + " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be"
            + " failed before the number of retries has been exhausted if the timeout configured by"
            + " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally"
            + " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control"
            + " retry behavior.";

允许重试需要满足两个条件:
1. 重试次数少于参数retries指定的值;
2. 异常是RetriableException类型或者TransactionManager允许重试;

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐