下面通过https://github.com/Shopify/sarama 代码进行跟踪,大致分为三步处理消费消息,这里是以consumer_group消费为例.
1.获取msg,offset+1
2.消费msg
3.提交offset

以下在代码中做一些注释,大致会有个流程,这中间还有一些过程处理都比较复杂,还是很多值我们去学习参照的地方.
比如如何去获取到消息,如何把msg传递到自已实现的ConsumeClaim函数去消费,如何去做手动ack确认,一个group组如何做到顺序消费等等…后面会补上

consumer.go

func (child *partitionConsumer) responseFeeder() {
	var msgs []*ConsumerMessage
	expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
	firstAttempt := true

feederLoop:
	for response := range child.feeder {
		msgs, child.responseResult = child.parseResponse(response)//获取消息,跳转到下面

		if child.responseResult == nil {
			atomic.StoreInt32(&child.retries, 0)
		}

		for i, msg := range msgs {
		messageSelect:
			select {
			case <-child.dying:
				child.broker.acks.Done()
				continue feederLoop
			case child.messages <- msg: //把消息通过chan写入下面的partitionConsumer结构体中
				firstAttempt = true
			case <-expiryTicker.C:
				if !firstAttempt {
					child.responseResult = errTimedOut
					child.broker.acks.Done()
				remainingLoop:
					for _, msg = range msgs[i:] {
						select {
						case child.messages <- msg:
						case <-child.dying:
							break remainingLoop
						}
					}
					child.broker.input <- child
					continue feederLoop
				} else {
					// current message has not been sent, return to select
					// statement
					firstAttempt = false
					goto messageSelect
				}
			}
		}

		child.broker.acks.Done()
	}

	expiryTicker.Stop()
	close(child.messages)
	close(child.errors)
}

func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
	var (
		metricRegistry          = child.conf.MetricRegistry
		consumerBatchSizeMetric metrics.Histogram
	)

	if metricRegistry != nil {
		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
	}

	// If request was throttled and empty we log and return without error
	if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
		Logger.Printf(
			"consumer/broker/%d FetchResponse throttled %v\n",
			child.broker.broker.ID(), response.ThrottleTime)
		return nil, nil
	}

	block := response.GetBlock(child.topic, child.partition)
	if block == nil {
		return nil, ErrIncompleteResponse
	}

	if block.Err != ErrNoError {
		return nil, block.Err
	}

	nRecs, err := block.numRecords()
	if err != nil {
		return nil, err
	}

	consumerBatchSizeMetric.Update(int64(nRecs))

	if nRecs == 0 {
		partialTrailingMessage, err := block.isPartial()
		if err != nil {
			return nil, err
		}
		// We got no messages. If we got a trailing one then we need to ask for more data.
		// Otherwise we just poll again and wait for one to be produced...
		if partialTrailingMessage {
			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
				// we can't ask for more data, we've hit the configured limit
				child.sendError(ErrMessageTooLarge)
				child.offset++ // skip this one so we can keep processing future messages
			} else {
				child.fetchSize *= 2
				// check int32 overflow
				if child.fetchSize < 0 {
					child.fetchSize = math.MaxInt32
				}
				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
					child.fetchSize = child.conf.Consumer.Fetch.Max
				}
			}
		}

		return nil, nil
	}

	// we got messages, reset our fetch size in case it was increased for a previous request
	child.fetchSize = child.conf.Consumer.Fetch.Default
	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

	// abortedProducerIDs contains producerID which message should be ignored as uncommitted
	// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
	// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
	abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
	abortedTransactions := block.getAbortedTransactions()

	messages := []*ConsumerMessage{}
	for _, records := range block.RecordsSet {
		switch records.recordsType {
		case legacyRecords:
			messageSetMessages, err := child.parseMessages(records.MsgSet)
			if err != nil {
				return nil, err
			}

			messages = append(messages, messageSetMessages...)
		case defaultRecords:
			// Consume remaining abortedTransaction up to last offset of current batch
			for _, txn := range abortedTransactions {
				if txn.FirstOffset > records.RecordBatch.LastOffset() {
					break
				}
				abortedProducerIDs[txn.ProducerID] = struct{}{}
				// Pop abortedTransactions so that we never add it again
				abortedTransactions = abortedTransactions[1:]
			}

			recordBatchMessages, err := child.parseRecords(records.RecordBatch)//封装msg跳转函数,
			if err != nil {
				return nil, err
			}

			// Parse and commit offset but do not expose messages that are:
			// - control records
			// - part of an aborted transaction when set to `ReadCommitted`

			// control record
			isControl, err := records.isControl()
			if err != nil {
				// I don't know why there is this continue in case of error to begin with
				// Safe bet is to ignore control messages if ReadUncommitted
				// and block on them in case of error and ReadCommitted
				if child.conf.Consumer.IsolationLevel == ReadCommitted {
					return nil, err
				}
				continue
			}
			if isControl {
				controlRecord, err := records.getControlRecord()
				if err != nil {
					return nil, err
				}

				if controlRecord.Type == ControlRecordAbort {
					delete(abortedProducerIDs, records.RecordBatch.ProducerID)
				}
				continue
			}

			// filter aborted transactions
			if child.conf.Consumer.IsolationLevel == ReadCommitted {
				_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
				if records.RecordBatch.IsTransactional && isAborted {
					continue
				}
			}

			messages = append(messages, recordBatchMessages...)
		default:
			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
		}
	}

	return messages, nil
}

封装msg

func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
	messages := make([]*ConsumerMessage, 0, len(batch.Records))

	for _, rec := range batch.Records {
		offset := batch.FirstOffset + rec.OffsetDelta
		fmt.Printf("offset: %v ,child.offset: %v\n", offset, child.offset)
		if offset < child.offset {
			continue
		}
		timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
		if batch.LogAppendTime {
			timestamp = batch.MaxTimestamp
		}
		messages = append(messages, &ConsumerMessage{
			Topic:     child.topic,
			Partition: child.partition,
			Key:       rec.Key, //消息key
			Value:     rec.Value,//消息value
			Offset:    offset, //消息的offset
			Timestamp: timestamp,
			Headers:   rec.Headers,
		})
		child.offset = offset + 1 //接收到消息封装到message之前先直接+1
		fmt.Printf("offset: %v ,child.offset: %v\n", offset, child.offset)
	}
	if len(messages) == 0 {
		child.offset++
	}
	return messages, nil
}

真正的消费处理代码,这里一个for{},接收chan通知触发接收上面的msg进行处理消费,在这个for{}里不只是接收consumer,还有其它通知也会处理broker事件等等,这里只跟踪consumer消费消息

type partitionConsumer struct {
	highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG

	consumer *consumer
	conf     *Config
	broker   *brokerConsumer
	messages chan *ConsumerMessage //
	....
}

通过官方的例子实现的消费的,claim.Messages()是一个chan,前面的msg数据通过中间接口进行数据转换,在这里进行消费

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	for message := range claim.Messages() {
		// log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
		fmt.Printf("Message claimed: value = %s\n", string(message.Value))
		session.MarkMessage(message, "")
	}
	return nil
}

处理消息结束后,需要把前面已经做了offset+1的结果提交到broker,代表成功消费,否则会一直拉取到这条消息
offset_manager.go


func (om *offsetManager) mainLoop() {
	defer om.ticker.Stop()
	defer close(om.closed)

	for {
		select {
		case <-om.ticker.C:
			om.flushToBroker()//触发,在下面执行
			om.releasePOMs(false)
		case <-om.closing:
			return
		}
	}
}

func (om *offsetManager) flushToBroker() {
	req := om.constructRequest()
	if req == nil {
		return
	}

	broker, err := om.coordinator()
	if err != nil {
		om.handleError(err)
		return
	}
	fmt.Println("currentId: ", om.memberID)
	resp, err := broker.CommitOffset(req)//做更新offset操作
	if err != nil {
		om.handleError(err)
		om.releaseCoordinator(broker)
		_ = broker.Close()
		return
	}

	om.handleResponse(broker, req, resp)//更新commit之后的结果
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐