go对接kafka
1.生产者,生产消息使用github.com/Shopify/sarama包 对接kafkapackage mainimport ("fmt""github.com/Shopify/sarama""strconv""time")func main() {//初始化配置config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.
·
1.生产者,生产消息
使用 github.com/Shopify/sarama包 对接kafka
package main
import (
"fmt"
"github.com/Shopify/sarama"
"strconv"
"time"
)
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //follow同步数据后返回
config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分配分区 partition
config.Producer.Return.Successes = true
// 创建生产者(同步发送),连接kafka
client, err := sarama.NewSyncProducer([]string{"192.168.0.113:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
for i := 0; i < 5; i++ {
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log" + strconv.Itoa(i))
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset) //分区id,偏移id
time.Sleep(2 * time.Second)
}
}
2.消费者消费消息
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
var wg sync.WaitGroup
func main() {
//创建消费者
consumer, err := sarama.NewConsumer([]string{"192.168.0.113:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
//获取主题分区
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
//遍历分区
for partition := range partitionList {
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
//pc, err := consumer.ConsumePartition("web_log", int32(partition), 90)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
wg.Add(1) //+1
go func(sarama.PartitionConsumer) {
defer wg.Done() //-1
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
wg.Wait()
}
更多推荐
已为社区贡献2条内容
所有评论(0)