var highWaterMarksBool = false
//在Kafka consumer rebalance成功以后进行调用,highWaterMarksBool主要保证成功进行了设置offset
func highWaterMarks() {
	if highWaterMarksBool {
		return
	}
	HighWaterMarks := consumer.HighWaterMarks() //这个要rebalance完毕以后才能生效
	for topic, partitions := range HighWaterMarks {
		for partition, _ := range partitions {
			offset, _ := client.GetOffset(topic, partition, sarama.OffsetNewest)
			consumer.MarkPartitionOffset(topic, partition, offset, "")
			app.Logger.Info("MarkPartitionOffset", topic, partition, offset, "")
		}
		highWaterMarksBool = true
	}
}


func consume() {
	defer consumer.Close()
	for {
		select {
		case msg, more := <-consumer.Messages():
			if more {
				requestFileBeat := &adcontext.FileBeat{}
				err := json.Unmarshal(msg.Value, requestFileBeat)
				if err == nil {
					adcontext.ContextMsg.KafkaMsgNew = append(adcontext.ContextMsg.KafkaMsgNew, requestFileBeat)
				} else {
					app.Logger.Errorf("Unmarshal ERR :%s", msg.Value)
				}
				consumer.MarkOffset(msg, "")
			} else {
				app.Logger.Infof("Kafka No More")
			}
		case err, more := <-consumer.Errors():
			if more {
				app.Logger.Infof("Kafka consumer error: %v", err.Error())
			}
		case ntf, more := <-consumer.Notifications():
			if more {
				app.Logger.Infof("Kafka consumer rebalance: %v", ntf)
				highWaterMarks()
			}
		}
	}

}

原理就是在rebalance以后获得当前offiset数据,然后根据获得的数据对MarkPartitionOffset进行设置,设置为最新的offiset

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐