kafka consumer如何处理offset
下面通过https://github.com/Shopify/sarama 代码进行跟踪,大致分为三步处理消费消息,这里是以consumer_group消费为例.1.获取msg,offset+12.消费msg3.提交offset以下在代码中做一些注释,大致会有个流程,这中间还有一些过程处理都比较复杂,还是很多值我们去学习参照的地方.比如如何去获取到消息,如何把msg传递到自已实现的Con...
·
下面通过https://github.com/Shopify/sarama 代码进行跟踪,大致分为三步处理消费消息,这里是以consumer_group消费为例.
1.获取msg,offset+1
2.消费msg
3.提交offset
以下在代码中做一些注释,大致会有个流程,这中间还有一些过程处理都比较复杂,还是很多值我们去学习参照的地方.
比如如何去获取到消息,如何把msg传递到自已实现的ConsumeClaim函数去消费,如何去做手动ack确认,一个group组如何做到顺序消费等等…后面会补上
consumer.go
func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
firstAttempt := true
feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)//获取消息,跳转到下面
if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
}
for i, msg := range msgs {
messageSelect:
select {
case <-child.dying:
child.broker.acks.Done()
continue feederLoop
case child.messages <- msg: //把消息通过chan写入下面的partitionConsumer结构体中
firstAttempt = true
case <-expiryTicker.C:
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
remainingLoop:
for _, msg = range msgs[i:] {
select {
case child.messages <- msg:
case <-child.dying:
break remainingLoop
}
}
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
firstAttempt = false
goto messageSelect
}
}
}
child.broker.acks.Done()
}
expiryTicker.Stop()
close(child.messages)
close(child.errors)
}
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
var (
metricRegistry = child.conf.MetricRegistry
consumerBatchSizeMetric metrics.Histogram
)
if metricRegistry != nil {
consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
}
// If request was throttled and empty we log and return without error
if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
Logger.Printf(
"consumer/broker/%d FetchResponse throttled %v\n",
child.broker.broker.ID(), response.ThrottleTime)
return nil, nil
}
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
}
if block.Err != ErrNoError {
return nil, block.Err
}
nRecs, err := block.numRecords()
if err != nil {
return nil, err
}
consumerBatchSizeMetric.Update(int64(nRecs))
if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
return nil, err
}
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
// check int32 overflow
if child.fetchSize < 0 {
child.fetchSize = math.MaxInt32
}
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}
return nil, nil
}
// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
abortedTransactions := block.getAbortedTransactions()
messages := []*ConsumerMessage{}
for _, records := range block.RecordsSet {
switch records.recordsType {
case legacyRecords:
messageSetMessages, err := child.parseMessages(records.MsgSet)
if err != nil {
return nil, err
}
messages = append(messages, messageSetMessages...)
case defaultRecords:
// Consume remaining abortedTransaction up to last offset of current batch
for _, txn := range abortedTransactions {
if txn.FirstOffset > records.RecordBatch.LastOffset() {
break
}
abortedProducerIDs[txn.ProducerID] = struct{}{}
// Pop abortedTransactions so that we never add it again
abortedTransactions = abortedTransactions[1:]
}
recordBatchMessages, err := child.parseRecords(records.RecordBatch)//封装msg跳转函数,
if err != nil {
return nil, err
}
// Parse and commit offset but do not expose messages that are:
// - control records
// - part of an aborted transaction when set to `ReadCommitted`
// control record
isControl, err := records.isControl()
if err != nil {
// I don't know why there is this continue in case of error to begin with
// Safe bet is to ignore control messages if ReadUncommitted
// and block on them in case of error and ReadCommitted
if child.conf.Consumer.IsolationLevel == ReadCommitted {
return nil, err
}
continue
}
if isControl {
controlRecord, err := records.getControlRecord()
if err != nil {
return nil, err
}
if controlRecord.Type == ControlRecordAbort {
delete(abortedProducerIDs, records.RecordBatch.ProducerID)
}
continue
}
// filter aborted transactions
if child.conf.Consumer.IsolationLevel == ReadCommitted {
_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
if records.RecordBatch.IsTransactional && isAborted {
continue
}
}
messages = append(messages, recordBatchMessages...)
default:
return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
}
}
return messages, nil
}
封装msg
func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
messages := make([]*ConsumerMessage, 0, len(batch.Records))
for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
fmt.Printf("offset: %v ,child.offset: %v\n", offset, child.offset)
if offset < child.offset {
continue
}
timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
if batch.LogAppendTime {
timestamp = batch.MaxTimestamp
}
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key, //消息key
Value: rec.Value,//消息value
Offset: offset, //消息的offset
Timestamp: timestamp,
Headers: rec.Headers,
})
child.offset = offset + 1 //接收到消息封装到message之前先直接+1
fmt.Printf("offset: %v ,child.offset: %v\n", offset, child.offset)
}
if len(messages) == 0 {
child.offset++
}
return messages, nil
}
真正的消费处理代码,这里一个for{},接收chan通知触发接收上面的msg进行处理消费,在这个for{}里不只是接收consumer,还有其它通知也会处理broker事件等等,这里只跟踪consumer消费消息
type partitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
consumer *consumer
conf *Config
broker *brokerConsumer
messages chan *ConsumerMessage //
....
}
通过官方的例子实现的消费的,claim.Messages()是一个chan,前面的msg数据通过中间接口进行数据转换,在这里进行消费
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
// log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
fmt.Printf("Message claimed: value = %s\n", string(message.Value))
session.MarkMessage(message, "")
}
return nil
}
处理消息结束后,需要把前面已经做了offset+1的结果提交到broker,代表成功消费,否则会一直拉取到这条消息
offset_manager.go
func (om *offsetManager) mainLoop() {
defer om.ticker.Stop()
defer close(om.closed)
for {
select {
case <-om.ticker.C:
om.flushToBroker()//触发,在下面执行
om.releasePOMs(false)
case <-om.closing:
return
}
}
}
func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
return
}
broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}
fmt.Println("currentId: ", om.memberID)
resp, err := broker.CommitOffset(req)//做更新offset操作
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}
om.handleResponse(broker, req, resp)//更新commit之后的结果
}
更多推荐
已为社区贡献2条内容
所有评论(0)