消费者回调

有些邮件发送成功之后,需要执行后续逻辑,例如更新数据库等。那么我们这时需要将Message Server变成生产者, 向Kafak中投递callBack消息;Business Server 此时是消费者, 消费callBack消息。

如何抽象callBack消息?

callBack的逻辑根据业务场景相关,如何在保证满足不同callBack业务逻辑的同时还满足callBack消息格式的统一呢? 我们使用反射来实现这一目的

@JsonDeserialize(builder = CallbackMetaData.CallbackMetaDataBuilder.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Getter
@Builder
@ToString
public class CallbackMetaData implements Serializable {

    @JsonProperty("id")
    private String messageId;

    /**
     * 应该是hostName, InetAddress.getLocalHost().getHostName()
     */
    @JsonProperty("serverId")
    private String serverId;

    @JsonProperty("className")
    private String className;

    /**
     * this string is the json string of the instance of the class, generated by Jackson.
     * for example:
     * className instance = new className();
     * objectMapper.writeValueAsString(instance);
     */
    @JsonProperty("instanceJsonStr")
    private String instanceJsonStr;

    @JsonProperty("methodName")
    private String methodName;

    @JsonProperty("arguments")
    private Object[] arguments;

}

上述内容,会作为邮件消息的一部分,发送给Message Server. 当Message Server发送完邮件之后, 会检查是否包含callback消息,如果包含,则将CallbackMetaData发送到相关topic

为什么要设置serverId?

有两点原因:

  1. 代码是滚动部署的,为了兼容性必须是:回调消息是由哪台Business Server携带的,就该回调到哪台Business Server。
  2. 一开始我们的思路是使用同一个topic,所有Business Server都订阅该topic,并且使用不同的消费者组,以达到广播消息的目的。这样在消费消息时通过判断当前消息的serverId是否当前server,如果是则消费,如果不是则直接提交offset。但是后来我们发现这样设计,会使得所有服务器每时每刻都在消费消息,即使该消息不是当前服务器的。改进后的设计是:每个服务器都有自己的callback topic, 只消费自己的callback topic下的消息即可。 callback topic的名字是:callback-serverId

serverId使用机器的hostName而不是IP, 因为IP有可能会变。

如何消费callBack消息?

其他消费者相关的代码我不再赘述,请参考上一篇博文的详细内容, 执行消费逻辑的代码就3行

ConsumerRecords<String, CallbackMetaData> records = consumer.poll(Duration.ofSeconds(10));
records.forEach(each -> {
    Class<?> destClass;
    try {
        //        核心消费代码, 通过反射调用目标方法
        destClass = Class.forName(each.value().getClassName());
        Object instance = objectMapper.readValue(each.value().getInstanceJsonStr(), destClass);
        MethodUtils.invokeMethod(instance, true, each.value().getMethodName(), each.value().getArguments());
    } catch (Exception e) {
        e.printStackTrace();
    }
});

定时重试

前两篇博文提到,无论生产者还是消费者,最终重试N次之后依旧失败的我们会把消息存储到数据库,以便后期通过定时任务进行重试。为了减轻业务服务器的负担,所有失败消息的重试都由Message Server负责。

消息失败表的设计

@Getter
@Setter
@ToString
@EqualsAndHashCode(of = {"messageId", "failedPhrases"})
public class MessageFailedEntity {
    
    /**
     * 主键
     */
    private Long id;
    
    /**
     * 消息id
     */
    private String messageId;
    
    /**
     * JSON格式的消息内容
     */
    private String messageContentJsonFormat;
    
    /**
     * 消息类型
     * EMAIL 表示此消息为邮件
     * EMAIL_CALLBACK 表示此消息为邮件回调
     *
     */
    private MessageType messageType;
    
    /**
     * 消息失败的阶段:
     * PRODUCER 表示在生产者发送消息的时候失败
     * CONSUMER 表示在消费者消费消息的时候失败
     */
    private MessageFailedPhase messageFailedPhase;
    
    /**
     * 失败时的异常堆栈信息
     */
    private String failedReason;
    
    /**
     * 消息重试次数
     */
    private Integer retryCount;
    
    /**
     * 消息重试状态
     * 0 表示重试失败
     * 1 表示重试成功
     */
    private Integer retryStatus;
    
    /**
     * 时间戳
     */
    private LocalDateTime lastUpdateTime;
    
}

重试逻辑设计

重试的思路很简单:

  1. 从数据库查询消息失败表获得一批记录,每次可能100条或者10000条,根据实际场景自己确定
  2. 根据消息类型和消息的JSON格式,序列化为对应类的对象,调用不同的生产者发送消息到Kafka
  3. 如果该消息失败阶段是PRODUCER,那么重试成功之后,则更新该记录未重试成功
  4. 如果该消息失败阶段是CONSUMER,那么重试成功之后,则只更新重试次数,由对应的消费者去更新是否重试成功。因为CONSUMER只有消费成功才算重试成功。
  5. 设置最大重试次数,如果超过最大重试次数,则不再进行重试
  6. 如果是部署了多个Message Server,那么执行定时重试任务时,可以使用分布式锁以确保只有同一时刻只有一个Message Server在执行任务,这样做的目的主要是防止为了多个任务同时进行时,从数据库中查询的记录是同一批,当然也可以在表中增加一个标志位来区分该记录是否在重试中来达到相同的目的,根据实际情况选择即可

到此为止,结合前两篇博文,我们处理了在整个消息系统中可能出现所有的异常情况。

理解Rabalance

Kafka权威指南 > 第四章 第一节

Moving partition ownership from one consumer to another is called a rebalance.

一开始接触rebalance时候,我在思考一个问题,如果我的消费者还在消费消息中, 此时Kafka要进行rebalance,这对我的消费者业务逻辑有什么影响?会不会我还在消费中,然后被打断,如果是这样的话,那对我的消费业务逻辑的幂等性来说增加了不少挑战。

带着这些疑问,我搜索了一些资料,在confluent官网上发现了一篇博客,详细讲了rebalance过程, 文章链接

以下内容来自于上述文章链接

  1. Suppose we have an existing consumer group with a set assignment of topic-partitions to consumers. This consumer group consists of a number of consumers, each with a member id as well as a group leader (usually the consumer that was first to join the group). A new consumer comes along and requests to join the consumer group by sending a request of JoinGroup to the Group Coordinator along with the topics it would like to subscribe to.
  2. The Group Coordinator kicks off the rebalance by telling all current members to issue their own JoinGroup requests. This is done as part of the response to the heartbeat that consumers send to the Group Coordinator to tell it they’re still alive and well.
  3. Each consumer in the group has max.poll.interval.ms to wrap up their current processing and send their JoinGroup request, at which point the world is stopped. With all of the JoinGroup requests, the Group Coordinator knows all of the consumers in the group and which topics should be part of the consumer group. It sends JoinResponses to the members, chooses a leader from amongst the members, and leaves the leader to compute the partition assignments.
  4. All group members respond with a SyncGroup request. The group leader sends its partition assignments along with its request.
  5. At this point, the Group Coordinator can send its SyncResponse to each consumer confirming their assigned topic-partitions.
  6. Finally, consumers acknowledge their assignments and processing can resume. The world is no longer stopped

第二处高亮的地方,解决了我的疑问,在rebalance之前,会等待每个消费者把自己的消费逻辑处理完。

通过日志来理解rebalance

下面的日志是我本地的一次rebalance期间的日志,可以对照上述步骤加深理解

2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator WARN: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] Resetting generation and member id due to: consumer pro-actively leaving the group
2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] Request joining group due to: consumer pro-actively leaving the group

2023-11-08T02:32:21.607-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.620-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-4
2023-11-08T02:32:21.621-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-9
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-8
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-1
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-5
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-6
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.752-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.753-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-3
2023-11-08T02:32:21.753-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-0
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-7
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='1-f1f4f621-3bde-4e02-9621-a706320300ae', protocol='range'}
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247', protocol='range'}
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='7-e77860ca-7059-4a38-bfc9-7db3cc862a38', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='3-ec05c279-8059-4f03-b804-3da299f93b88', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='5-d08406a6-2bd1-4bed-aed9-5f65d7f75260', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='9-bab69ccd-a0b1-4f94-99d5-869fce905f3e', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='6-67b38de0-90e0-4f53-aaae-49c10a91a463', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='4-b24fc609-920a-4cc3-b507-2a4a1f1a568b', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='8-70cd2b5d-e17f-4346-bfcb-b170e766db39', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Finished assignment for group at generation 12: {10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247=Assignment(partitions=[low-priority-email-2]), 7-e77860ca-7059-4a38-bfc9-7db3cc862a38=Assignment(partitions=[low-priority-email-7]), 3-ec05c279-8059-4f03-b804-3da299f93b88=Assignment(partitions=[low-priority-email-3]), 6-67b38de0-90e0-4f53-aaae-49c10a91a463=Assignment(partitions=[low-priority-email-6]), 8-70cd2b5d-e17f-4346-bfcb-b170e766db39=Assignment(partitions=[low-priority-email-8]), 1-f1f4f621-3bde-4e02-9621-a706320300ae=Assignment(partitions=[low-priority-email-0, low-priority-email-1]), 4-b24fc609-920a-4cc3-b507-2a4a1f1a568b=Assignment(partitions=[low-priority-email-4]), 5-d08406a6-2bd1-4bed-aed9-5f65d7f75260=Assignment(partitions=[low-priority-email-5]), 9-bab69ccd-a0b1-4f94-99d5-869fce905f3e=Assignment(partitions=[low-priority-email-9])}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='1-f1f4f621-3bde-4e02-9621-a706320300ae', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-0, low-priority-email-1])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-2])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-2
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-0, low-priority-email-1
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='3-ec05c279-8059-4f03-b804-3da299f93b88', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='5-d08406a6-2bd1-4bed-aed9-5f65d7f75260', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-3])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-3
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='7-e77860ca-7059-4a38-bfc9-7db3cc862a38', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-5])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='8-70cd2b5d-e17f-4346-bfcb-b170e766db39', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-7])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='4-b24fc609-920a-4cc3-b507-2a4a1f1a568b', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-7
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-5
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-8])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='9-bab69ccd-a0b1-4f94-99d5-869fce905f3e', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='6-67b38de0-90e0-4f53-aaae-49c10a91a463', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-4])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-8
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-9])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-4
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-6])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-9
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-6
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Setting offset for partition low-priority-email-2 to the committed offset FetchPosition{offset=13436, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Setting offset for partition low-priority-email-1 to the committed offset FetchPosition{offset=13301, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Setting offset for partition low-priority-email-0 to the committed offset FetchPosition{offset=24087, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Setting offset for partition low-priority-email-3 to the committed offset FetchPosition{offset=13299, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Setting offset for partition low-priority-email-4 to the committed offset FetchPosition{offset=13352, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Setting offset for partition low-priority-email-8 to the committed offset FetchPosition{offset=13190, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Setting offset for partition low-priority-email-9 to the committed offset FetchPosition{offset=13151, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Setting offset for partition low-priority-email-6 to the committed offset FetchPosition{offset=13211, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Setting offset for partition low-priority-email-7 to the committed offset FetchPosition{offset=13303, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Setting offset for partition low-priority-email-5 to the committed offset FetchPosition{offset=13338, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}

完整日志可在这里免积分下载

参考资料

结语

至此为止,利用Kafka实现一个消息系统就基本完成了,所有关键的代码都在不同的博文中并进行了详细说明,如果想要体会完整的设计、实现思路,请移步源码仓库获取完整代码。

下一篇关于Kafak的博文打算分享一下如何利用Kafka Connect将Oracle数据库的数据同步到Postgre SQL中。

示例源码仓库

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐