kafka --- 生产者
主要讲两个重要点:1、分区选择 2、ack机制springboot中现在已经嵌入了kafka,可以直接用KafkaTemplate进行操作,和RestTemplate类似maven中配置:<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka&l
主要讲两个重要点:1、分区选择 2、ack机制 ,3、重试机制
springboot中现在已经嵌入了kafka,可以直接用KafkaTemplate进行操作,和RestTemplate类似
maven中配置:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: -1
分区选择
package kafka;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.UUID;
@Slf4j
@RestController
@RequestMapping(value = "kafkaProducer")
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String ,Object> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
@GetMapping(value = "/sendMessage")
public int sendMessage(){
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMessage(UUID.randomUUID().toString());
message.setSendTime(new Date());
log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
kafkaTemplate.send("testTopic",gson.toJson(message));
return 1;
}
}
kafkaTemplate.send可就有往kafka集群里面push数据
看下kafkaTemplate中send的重载方法
// 不执定分区,key public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data); return this.doSend(producerRecord); } // 指定key public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data); return this.doSend(producerRecord); } // 指定分区和key public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data); return this.doSend(producerRecord); }
其都是把这些参数,封装成一个ProducerRecord对象
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer(producerRecord.topic());
this.logger.trace(() -> "Sending: " + producerRecord);
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
Object sample = null;
if (this.micrometerEnabled && this.micrometerHolder == null) {
this.micrometerHolder = obtainMicrometerHolder();
}
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
Future<RecordMetadata> sendFuture =
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
// May be an immediate failure
if (sendFuture.isDone()) {
try {
sendFuture.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted", e);
}
catch (ExecutionException e) {
throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
}
}
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + producerRecord);
return future;
}
主要看
Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
以上代码是springboot实现的
接下来就是kafka api中的代码了
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
.....................................................
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);
........................................................................
}
主要看两行代码:
1、serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
keySerializer 是在配置文件中配置的序列化类,对key进行序列化成byte[]
2、int partition = partition(record, serializedKey, serializedValue, cluster);
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
逻辑是如果send的时候如果指定了分区直接往指定的分区里面发送消息 如果没有指定分区就执行
partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
看这里有三种策略,当然你可以指定用那些策略,我们先来看看默认的策略
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
看这里就说明,如果send时候没有传key参数,则走stickyPartitionCache.partition(topic, cluster);大致是一种轮询的机制,对分区进行轮询
如果传入了key,则对key进行hash取模
则总结一下:
(1)指定了patition,则直接使用;
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition
ack机制
在源码ProducerConfig类中:
public static final String ACKS_CONFIG = "acks";
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
+ " durability of records that are sent. The following settings are allowed: "
+ " <ul>"
+ " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
+ " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
+ " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
+ " take effect (as the client won't generally know of any failures). The offset given back for each record will"
+ " always be set to <code>-1</code>."
+ " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
+ " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
+ " acknowledging the record but before the followers have replicated it then the record will be lost."
+ " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
+ "</ul>";
大致的意思是
acks = 0 时,只要发送了这条信息,就一律认为成功了,然后发送第二条
缺点:网络不好,或者leader分区所在服务器宕机了都会造成数据丢失
优点:性能极佳
acks = 1 (默认配置)时,只要发送给leader分区的数据落磁盘了,就认为成功了,然后发送第二条,
缺点:如果leader分区所在服务器宕机了,还没把数据同步到follower分区上去,之后选举出来 的leader分区没有之前发送的数据,造成数据丢失
优点:性能较好
acks = all 时,只要发送给leader分区的数据落磁盘了,而且把数据同步给follower分区了,然后发送第二条
缺点:性能差(acks = 1的性能是acks = all的10倍)
优点:能尽可能的保证数据不丢失
acks=all就代表数据一定不会丢失了吗?
当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?
当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。
所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。
这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。
重试机制
/** <code>retries</code> */
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
+ " Note that this retry is no different than if the client resent the record upon receiving the error."
+ " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
+ " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
+ " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be"
+ " failed before the number of retries has been exhausted if the timeout configured by"
+ " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally"
+ " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control"
+ " retry behavior.";
允许重试需要满足两个条件:
1. 重试次数少于参数retries
指定的值;
2. 异常是RetriableException类型或者TransactionManager允许重试;
更多推荐
所有评论(0)