golang用sarama连接kafka
不用group连接这种方式我自己有个疑问分别在v2ex和0GitHub上提了问题,如有知道的也请告知package mainimport ("fmt""log""os""os/signal""time""github.com/Shopify/sarama""github.com/alecthomas/log4go")func main() {d...
·
不用group连接
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/Shopify/sarama"
"github.com/alecthomas/log4go"
)
func main() {
defer func() {
time.Sleep(time.Second)
log4go.Warn("[main] consumer quit over!")
log4go.Global.Close()
}()
// sarama的logger
sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags)
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
client, err := sarama.NewConsumer([]string{"192.168.198.145:9092"}, config)
if err != nil {
panic(err)
}
pc, err := client.ConsumePartition("topic.sarama", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer func() {
if err := client.Close(); err != nil {
log.Fatalln(err)
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-pc.Messages():
log.Printf("Consumed message offset %d, msg: %v \n", msg.Offset, string(msg.Value))
consumed++
case <-signals:
break ConsumerLoop
}
}
}
用group连接
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/alecthomas/log4go"
)
func main() {
defer func() {
time.Sleep(time.Second)
log4go.Warn("[main] consumer quit over!")
log4go.Global.Close()
}()
// sarama的logger
sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags)
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// kafka consumer client
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup([]string{"192.168.198.145:9092"}, "group.sarama", config)
if err != nil {
panic(err)
}
consumer := Consumer{}
go func() {
for {
err := client.Consume(ctx, []string{"topic.sarama"}, &consumer)
if err != nil {
log4go.Error("[main] client.Consume error=[%s]", err.Error())
// 5秒后重试
time.Sleep(time.Second * 5)
}
}
}()
// os signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
cancel()
err = client.Close()
if err != nil {
panic(err)
}
log4go.Info("[main] consumer is quiting")
}
type Consumer struct {
}
func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
key := string(message.Key)
val := string(message.Value)
log4go.Info("%s-%s", key, val)
session.MarkMessage(message, "")
}
return nil
}
更多推荐
所有评论(0)