go-sarama 生产者与消费者
目录1、生产者2、消费者1、生产者/* producer */package mainimport ("fmt""github.com/Shopify/sarama""strconv""time")// go mod init// go get// 基于sarama第三方库开发的kafka clientfunc main() {//初始化配置config := sarama.NewConfig()
·
目录
1、生产者
/* producer */
package main
import (
"fmt"
"github.com/Shopify/sarama"
"strconv"
"time"
)
// go mod init
// go get
// 基于sarama第三方库开发的kafka client
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机分配分区 partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
fmt.Println(111)
// 创建生产者,连接kafka
client, err := sarama.NewSyncProducer([]string{"192.168.0.113:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
fmt.Println(222)
for i := 0; i < 5; i++ {
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log" + strconv.Itoa(i))
fmt.Println(333)
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset) //分区id,偏移id
time.Sleep(2 * time.Second)
}
}
2、消费者
/* consumer */
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
var wg sync.WaitGroup
func main() {
//创建消费者
consumer, err := sarama.NewConsumer([]string{"192.168.0.113:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
//获取主题分区
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
//遍历分区
for partition := range partitionList {
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
//pc, err := consumer.ConsumePartition("web_log", int32(partition), 90)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
wg.Add(1) //+1
go func(sarama.PartitionConsumer) {
defer wg.Done() //-1
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
wg.Wait() //
}
更多推荐
已为社区贡献2条内容
所有评论(0)