ListenerConsumer
一、构造方法,两件事,定义一个rebalance listener,consumer订阅topic
private ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {//rebalance开始前、consumer处理最后一条消息后,触发
// do not stop the invoker if it is not started yet
// this will occur on the initial start on a subscription
if (!ListenerConsumer.this.autoCommit) {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger.trace("Received partition revocation notification, " +
"and will stop the invoker.");
}
if (ListenerConsumer.this.listenerInvokerFuture != null) {
stopInvokerAndCommitManualAcks();//停掉listenerInvoker,人工commit offset,这个时候不管ackMode是啥,都commit
ListenerConsumer.this.recordsToProcess.clear();
ListenerConsumer.this.unsent = null;
}
else {
if (!CollectionUtils.isEmpty(partitions)) {
ListenerConsumer.this.logger.error("Invalid state: the invoker was not active, " +
"but the consumer had allocated partitions");
}
}
}
else {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger.trace("Received partition revocation notification, " +
"but the container is in autocommit mode, " +
"so transition will be handled by the consumer");//这句话是说自动提交时,是consumer自己处理rebalance的消息?
}
}
getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {//rebalance结束后、consumer处理第一条消息前触发
ListenerConsumer.this.assignedPartitions = partitions;
if (!ListenerConsumer.this.autoCommit) {
// Commit initial positions - this is generally redundant but
// it protects us from the case when another consumer starts
// and rebalance would cause it to reset at the end
// see https://github.com/spring-projects/spring-kafka/issues/110
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : partitions) {//commit一下初始的offset
offsets.put(partition, new OffsetAndMetadata(consumer.position(partition)));
}
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Committing: " + offsets);
}
if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
ListenerConsumer.this.consumer.commitSync(offsets);
}
else {
ListenerConsumer.this.consumer.commitAsync(offsets,
KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
}
}
// We will not start the invoker thread if we are in autocommit mode,
// as we will execute synchronously then
// We will not start the invoker thread if the container is stopped
// We will not start the invoker thread if there are no partitions to
// listen to
if (!ListenerConsumer.this.autoCommit && KafkaMessageListenerContainer.this.isRunning()
&& !CollectionUtils.isEmpty(partitions)) {
startInvoker();//自动提交是在consumer里同步执行的,所以只在非自动提交时启动listenerInvoker
}
getContainerProperties().getConsumerRebalanceListener().onPartitionsAssigned(partitions);
}
};
if (KafkaMessageListenerContainer.this.topicPartitions == null) {//如果没有指定partition,给订阅一下
if (this.containerProperties.getTopicPattern() != null) {
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
}
else {
consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
}
}
else {//有指定的,初始化definedPartitions,把partition assign给consumer
List<TopicPartitionInitialOffset> topicPartitions =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new HashMap<>(topicPartitions.size());
for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
this.definedPartitions.put(topicPartition.topicPartition(), topicPartition.initialOffset());
}
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
this.consumer = consumer;
this.listener = listener;
this.acknowledgingMessageListener = ackListener;
}
@Override
public void run() {
this.count = 0;
this.last = System.currentTimeMillis();
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();//初始化各partition的offset
1、topicPartition指定了初始offset的话就是这个
2、没指定的话不初始化,那就是从队头取起
3、指定了负数n,就取该partition的最大offset+n,也就是队尾往前数n条
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();//启动invoker,开始消费消息
}
}
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
while (isRunning()) {
try {
if (!this.autoCommit) {
processCommits();//非自动提交,处理提交操作
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Polling (paused=" + this.paused + ")...");
}
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
//自动提交的立刻执行,非自动提交的发送到阻塞队列recordsToProcess里去,ListenerInvoker是从这个队列里取数据处理的,然而在哪里commit的呢,没看到。。。。
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
// avoid group management rebalance due to a slow
// consumer
this.consumer.pause(this.assignedPartitions
.toArray(new TopicPartition[this.assignedPartitions.size()]));
this.paused = true;
this.unsent = records;
}
}
}
}
else {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - lastReceive);
lastAlertAt = now;
}
}
}
this.unsent = checkPause(this.unsent);
}
catch (WakeupException e) {
this.unsent = checkPause(this.unsent);
}
catch (Exception e) {
if (this.containerProperties.getErrorHandler() != null) {
this.containerProperties.getErrorHandler().handle(e, null);
}
else {
this.logger.error("Container exception", e);
}
}
}
//能执行到这里,说明isRunning=false了,ListenerConsumer要停止了
if (this.listenerInvokerFuture != null) {
stopInvokerAndCommitManualAcks();
}
try {
this.consumer.unsubscribe();
}
catch (WakeupException e) {
// No-op. Continue process
}
this.consumer.close();
if (this.logger.isInfoEnabled()) {
this.logger.info("Consumer stopped");
}
}
private void startInvoker() {
ListenerConsumer.this.invoker = new ListenerInvoker();
ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
.submit(ListenerConsumer.this.invoker); //使用线程池跑ListenerInvoker
}
private final class ListenerInvoker implements SchedulingAwareRunnable {
private final CountDownLatch exitLatch = new CountDownLatch(1);
private volatile boolean active = true;
private volatile Thread executingThread;
@Override
public void run() {
Assert.isTrue(this.active, "This instance is not active anymore");
try {
this.executingThread = Thread.currentThread();
while (this.active) {
try {
ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
TimeUnit.SECONDS);
if (this.active) {
if (records != null) {
invokeListener(records);
}
else {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger.trace("No records to process");
}
}
}
}
catch (InterruptedException e) {
if (!this.active) {
Thread.currentThread().interrupt();
}
else {
ListenerConsumer.this.logger.debug("Interrupt ignored");
}
}
}
}
finally {
this.active = false;
this.exitLatch.countDown();
}
}
@Override
public boolean isLongLived() {
return true;
}
private void stop() {
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Stopping invoker");
}
this.active = false;
try {
if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS)
&& this.executingThread != null) {
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Interrupting invoker");
}
this.executingThread.interrupt();
}
}
catch (InterruptedException e) {
if (this.executingThread != null) {
this.executingThread.interrupt();
}
Thread.currentThread().interrupt();
}
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Invoker stopped");
}
}
}
private void invokeListener(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
//自动提交或者有invoker,都进入,费解,目前看到的只有非自动提交时才startInvoker啊,得继续看看
while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
final ConsumerRecord<K, V> record = iterator.next();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Processing " + record);
}
try {
if (this.acknowledgingMessageListener != null) {//有Acknowledgment参数的onMessage方法
this.acknowledgingMessageListener.onMessage(record, this.isAnyManualAck
? new ConsumerAcknowledgment(record, this.isManualImmediateAck) : null);
}
else {//没有Acknowledgment参数的onMessage方法
this.listener.onMessage(record);
}
if (!this.isAnyManualAck && !this.autoCommit) {//非人工提交、非自动提交,加入到acks
this.acks.add(record);
}
}
catch (Exception e) {
if (this.containerProperties.isAckOnError() && !this.autoCommit) {//开启了ackOnError时,加入到acks
this.acks.add(record);
}
if (this.containerProperties.getErrorHandler() != null) {
this.containerProperties.getErrorHandler().handle(e, record);//errorhandler在这里被调用
}
else {
this.logger.error("Listener threw an exception and no error handler for " + record, e);
}
}
}
}
private void processCommits() {
handleAcks();//处理acks队列,manual_immedaite的立刻提交,其他的加入到offsets
this.count += this.acks.size();
long now;
AckMode ackMode = this.containerProperties.getAckMode();
if (!this.isManualImmediateAck) {//不是立刻人工提交时
if (!this.isManualAck) {//不是人工提交时
updatePendingOffsets();//既不是自动提交又不是人工提交,那么就走这里,是更新offsets数据,为什么要把manual的排除在外呢?
}
boolean countExceeded = this.count >= this.containerProperties.getAckCount();
if (this.isManualAck || this.isBatchAck || this.isRecordAck
|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {//ackMode是人工、单条、批量、数量时,提交操作
if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
this.logger.debug("Committing in AckMode.COUNT because count " + this.count
+ " exceeds configured limit of " + this.containerProperties.getAckCount());
}
commitIfNecessary();
this.count = 0;
}
else {
now = System.currentTimeMillis();
boolean elapsed = now - this.last > this.containerProperties.getAckTime();
if (ackMode.equals(AckMode.TIME) && elapsed) {//ackMode是time时
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing in AckMode.TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
}
commitIfNecessary();
this.last = now;
}
else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {//ackMode是count_time时
if (this.logger.isDebugEnabled()) {
if (elapsed) {
this.logger.debug("Committing in AckMode.COUNT_TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
}
else {
this.logger.debug("Committing in AckMode.COUNT_TIME " +
"because count " + this.count + " exceeds configured limit of" +
this.containerProperties.getAckCount());
}
}
commitIfNecessary();
this.last = now;
this.count = 0;
}
}
}
}
private void handleAcks() {
ConsumerRecord<K, V> record = this.acks.poll();
while (record != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Ack: " + record);
}
processAck(record);
record = this.acks.poll();
}
}
private void processAck(ConsumerRecord<K, V> record) {
if (ListenerConsumer.this.isManualImmediateAck) {
try {
ackImmediate(record);//人工立即提交的,就这里提交了
}
catch (WakeupException e) {
// ignore - not polling
}
}
else {
addOffset(record); //加到offsets里
}
}
private void updatePendingOffsets() {
ConsumerRecord<K, V> record = this.acks.poll();//从阻塞队列取数据,数据在invokeListener里放进去的
while (record != null) {
addOffset(record);
record = this.acks.poll();
}
}
private void addOffset(ConsumerRecord<K, V> record) {
if (!this.offsets.containsKey(record.topic())) {
this.offsets.put(record.topic(), new HashMap<Integer, Long>());
}
this.offsets.get(record.topic()).put(record.partition(), record.offset());//offsets是个嵌套MAP,topic partition offset
}
private void commitIfNecessary() {
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
//提交是从offsets里取出来的,不是人工提交时是updatePendingOffsets()方法放进去的,人工提交时是handleAcks()方法放进去的
//这两个方法都是从acks队列取数据,那么新的问题是,人工提交时在哪里放到acks队列里去的?
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
new OffsetAndMetadata(offset.getValue() + 1));
}
}
this.offsets.clear();//offsets清空
if (this.logger.isDebugEnabled()) {
this.logger.debug("Commit list: " + commits);
}
if (!commits.isEmpty()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing: " + commits);
}
try {
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(commits);//kakfa-clients.jar里面的方法了
}
else {
this.consumer.commitAsync(commits, this.commitCallback);//kakfa-clients.jar里面的方法了
}
}
catch (WakeupException e) {
// ignore - not polling
if (this.logger.isDebugEnabled()) {
this.logger.debug("Woken up during commit");
}
}
}
}
也就是说:
一、autocommit =false时
1、ackMode不是manual、manual_immediate的时候,invokeListener把消息加入到ack队列里去
2、是manual、manual_immediate的时候,用户调用acknowledge方法,把消息加入到ack队列;
autocommit = true时,消息不放到ack队列
二、看processCommits方法的代码,貌似manual_immediate的发起提交也并不比manual、batch、record、count、time、count_time的发起提交提前很多,不太理解是个什么节奏
三、autocommit =ture时,在哪里commit offset的还没看见。。。
所有评论(0)