go之kafka库sarama的使用
saram是一个使用纯go语言编写的kafka库。
·
saram
是一个使用纯go语言编写的kafka库。
sarama安装
go get github.com/Shopify/sarama
生产消息(异步)
package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"time"
)
var address = []string{"192.168.68.142:9092"}
func main() {
// 配置
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// 版本
config.Version = sarama.V0_10_2_1
fmt.Println("start make producer")
//使用配置,新建一个异步生产者
producer, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("new async producer error: %s \n", err.Error())
return
}
defer producer.AsyncClose()
// 循环判断哪个通道发送过来数据
fmt.Println("start goroutine")
go func(p sarama.AsyncProducer) {
for {
select {
case suc := <-p.Successes():
fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-p.Errors():
fmt.Println("error: ", fail.Error())
}
}
}(producer)
var value string
for i := 0; ; i++ {
// 每隔两秒发送一条消息
time.Sleep(2 * time.Second)
// 创建消息
value = fmt.Sprintf("async message, index = %d", i)
// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系
msg := &sarama.ProducerMessage{
Topic: "web_log",
Value: sarama.ByteEncoder(value),
}
// 使用通道发送
producer.Input() <- msg
}
}
消费消息
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
)
type AAAConsumerGroupHandler struct{}
func (AAAConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (AAAConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
// 这个方法用来消费消息的
func (h AAAConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 获取消息
for msg := range claim.Messages() {
fmt.Printf("topic:%q partition:%d offset:%d value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 将消息标记为已使用
sess.MarkMessage(msg, "")
}
return nil
}
// 接收数据
func main() {
// 先初始化 kafka
config := sarama.NewConfig()
// Version 必须大于等于 V0_10_2_0
config.Version = sarama.V0_10_2_1
config.Consumer.Return.Errors = true
fmt.Println("start connect kafka")
// 开始连接kafka服务器
group, err := sarama.NewConsumerGroup([]string{"192.168.68.142:9092"}, "AAA-group", config)
if err != nil {
fmt.Println("连接kafka失败:", err)
return
}
// 检查错误
go func() {
for err := range group.Errors() {
fmt.Println("分组错误 : ", err)
}
}()
ctx := context.Background()
fmt.Println("开始获取消息")
// for 是应对 consumer rebalance
for {
// 需要监听的主题
topics := []string{"web_log"}
handler := AAAConsumerGroupHandler{}
// 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里
err := group.Consume(ctx, topics, handler)
if err != nil {
fmt.Println("消费失败; err : ", err)
return
}
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)