sarama-cluster消费kafka从最新offset开始
var highWaterMarksBool = false//在Kafka consumer rebalance成功以后进行调用,highWaterMarksBool主要保证成功进行了设置offsetfunc highWaterMarks() {if highWaterMarksBool {return}HighWaterMarks := consumer.HighWaterMarks() //
·
var highWaterMarksBool = false
//在Kafka consumer rebalance成功以后进行调用,highWaterMarksBool主要保证成功进行了设置offset
func highWaterMarks() {
if highWaterMarksBool {
return
}
HighWaterMarks := consumer.HighWaterMarks() //这个要rebalance完毕以后才能生效
for topic, partitions := range HighWaterMarks {
for partition, _ := range partitions {
offset, _ := client.GetOffset(topic, partition, sarama.OffsetNewest)
consumer.MarkPartitionOffset(topic, partition, offset, "")
app.Logger.Info("MarkPartitionOffset", topic, partition, offset, "")
}
highWaterMarksBool = true
}
}
func consume() {
defer consumer.Close()
for {
select {
case msg, more := <-consumer.Messages():
if more {
requestFileBeat := &adcontext.FileBeat{}
err := json.Unmarshal(msg.Value, requestFileBeat)
if err == nil {
adcontext.ContextMsg.KafkaMsgNew = append(adcontext.ContextMsg.KafkaMsgNew, requestFileBeat)
} else {
app.Logger.Errorf("Unmarshal ERR :%s", msg.Value)
}
consumer.MarkOffset(msg, "")
} else {
app.Logger.Infof("Kafka No More")
}
case err, more := <-consumer.Errors():
if more {
app.Logger.Infof("Kafka consumer error: %v", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
app.Logger.Infof("Kafka consumer rebalance: %v", ntf)
highWaterMarks()
}
}
}
}
原理就是在rebalance以后获得当前offiset数据,然后根据获得的数据对MarkPartitionOffset进行设置,设置为最新的offiset
更多推荐
已为社区贡献2条内容
所有评论(0)