kafka权威指南中文版之二
上图所示,consumer订阅kafka集群中(一个broker中的一个topic中)的消息,然后对broker发起一个获取消息的请求,请求中携带了topic、partition、offset等信息,接着用pull的方式获取kafka log中所有可用消息,并对消息中的数据进行处理,比如使用spark进行计算,将结果存入DB中。consumer订阅消息时,会连接上任一个可用的broker,并获
·
上图所示,consumer订阅kafka集群中(一个broker中的一个topic中)的消息,然后对broker发起一个获取消息的请求,请求中携带了topic、partition、offset等信息,接着用pull的方式获取kafka log中所有可用消息,并对消息中的数据进行处理,比如使用spark进行计算,将结果存入DB中。
consumer订阅消息时,会连接上任一个可用的broker,并获取topic中leader partition的元数据metadata信息,这样consumer就可以直接与leader partition通信,获取消息。
消费者Consumer客户端可以用多种语言实现,如Java语言、C语言或者Python语言。这里采用Java语言实现。
Consumer通过与brokers的TCP连接来获取数据。未能及时关闭Consumer,将导致TCP连接泄露。
Consumer是非线程安全的;
offset
在kafka中的每一个partition中 ,kafka会为每一条消息记录分配一个数值型的offset。offset值唯一标识了partition中的一条消息,也表示消费者在分区的消费位置。也就是说,一个消费者Consumer的消费position为5,说明已经消费了offset为0,1,2,3,4的消息,下一个要消费的消息的offset为5。
position对于Consumer来说,有两层含义:
position: Consumer的position等于将要消费记录的offset;大于Consumer已消费paitition中的消息的offset的最大值。在Consumer每次调用poll(long)方法获取消息时,position的值自动增加。
committed position:是最后一个成功保存的offset,重启或者错误处理时,需要恢复到的offset的值等于committed position。Comsumer可以周期性的自动提交offset,也可以手工调用commitSync来或者commitAsync方法提交。commitSync方法将阻塞,直到提交成功或者发生错误。commitAsync方法不阻塞,无论成功或者失败,都将调用回调函数OffsetCommitCallback。
消费者分组和再平衡
kafka使用consumer groups来划分消息处理和消费的进程(池)。这些进程可以运行在同一台机器上,也可以运行在多台机器上(有利于扩展和容错)。
一个消费者Comsumer只能属于一个consumer group ,通过subscribe API 可以动态设置topic列表。kafka会将topic中的每一条消息发送给consumer group中的一条进程。为了使topic partition分区与consumer group中的进程达到平衡,每一个partition只会有consumer group中的一个消费者来消费。例如:如果一个topic有4个分区,一个consumer group有2个进程(消费者),那么,每一个进程会消费2个分区中的消息。
如果consumer group分组中的一个consumer失败了,其消费的topic分区将会分配给相同分组中的其他消费者;如果consumer group分组中新增了一个consumer,topic分区将会从已有消费者上移动到新消费者上。上述过程称为
rebalancing再平衡。当topic中增加一个新分区时,会采用相同的过程进行再平衡。
一个consumer group可以看做是一个独立的逻辑订阅者,此订阅者可以包含多个进程。
当一个分组进行再平衡操作时,会通过
ConsumerRebalanceListener
类来通知消费者,消费者可以借此进行应用程序级别的逻辑处理:如状态清零、手工提交offset等。
一个partition只会被一个消费者消费。
消费者consumer失败检测
消费者订阅一系列topic主题之后,在调用poll(long)方法时,将自动归属于一个group。poll API来保证consumer的存活状态。只要持续调用poll方法,消费者将一直存在于分组中,持续收到所属partition中的消息。底层实现内幕是:poll API会周期性的向server发送心跳,当停止调用poll方法时,将会停止发送心跳。如果在超过了session失效时间,那么此消费者会被从当前组中移除,其消费的partition将会被重新分配。这样做,是为了避免这样的场景:一个消费者失败了,仍然持有其partition。
这样的设计意味着:poll循环中,消息的处理时间要小于心跳的超时时间。如果大于心跳超时时间,消费者将无法提交offset(commitSync()方法会抛出CommitFailedException 异常)。
消费者提供了两种配置设置来控制这种行为:
1、session.timeout.ms:通过增加session失效时间,consumer可以有更多的时间来处理从poll(long)获取的一批记录。缺点是:延长了server发现consumer失败的时间,进而导致延迟再平衡时间。但是不包括consumer调用close方法的情况,因为此时consumer会发送一个显式的消息到server,此时会触发一个及时的再平衡操作。
2、max.poll.records: poll循环中的处理时间与处理的记录数量成正比,所以要限制一次处理记录的数量。可以通过此参数设置,默认情况下是没有限制。
在一些场景下,消息处理时间是很难预测的,上述两种配置都不可行。推荐的方式为:将消息处理逻辑放到一个独立的线程中,这样consumer可以继续发送心跳。需要注意的是,提交的offset不应该在实际位置之前。也就是说,你应该禁用自动提交offset,在消息处理线程中手工提交offset。通常情况下,你需要使用 pause(Collection)方法来停止从partition中获取新的消息。消费者api提供了灵活性,覆盖了各种消费的用例。
更多推荐
已为社区贡献3条内容
所有评论(0)