Go语言中使用kafka
【代码】Go语言中使用kafka。
·
1.Windows环境下安装zookeeper和kafka
2.运行zookeeper
3.运行kaka
4.生产者
import (
"encoding/json"
"github.com/Shopify/sarama"
"strconv"
)
type Product struct {
Id int
Name string
Title string
}
func NewProduct() error {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
brokers := []string{"localhost:9092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return err
}
p := &Product{
Id: 1,
Name: "钻戒",
Title: "那戒指的质地似乎是钻石制成的吧,闪闪发光又不失内敛,清雅又不失高贵,阳光洒下来,发出淡淡的光,和淡淡的清香,有着像是通了灵般的仙气",
}
key := sarama.StringEncoder(strconv.Itoa(p.Id))
value, err := json.Marshal(p)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: "new-products",
Key: key,
Value: sarama.ByteEncoder(value),
}
producer.Input() <- msg
return nil
}
5.消费者
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"log"
)
func Consume() error {
// 初始化 Kafka 消费者
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
brokers := []string{"localhost:9092"}
consumer, err := sarama.NewConsumer(brokers, config)
partitionConsumer, err := consumer.ConsumePartition("newProduct", 0, sarama.OffsetNewest)
if err != nil {
log.Printf("Error consuming partition: %v", err)
return err
}
for {
select {
case msg := <-partitionConsumer.Messages():
var product Product
err = json.Unmarshal(msg.Value, &product)
if err != nil {
log.Printf("Error unmarshaling product: %v", err)
return err
} else {
fmt.Printf("New product: %+v\n", product)
}
case err = <-partitionConsumer.Errors():
log.Printf("Error consuming message: %v", err)
return err
}
}
}
6.main函数
import (
"fmt"
"golang_test/kafka_test/kafka"
"log"
"sync"
)
var wg sync.WaitGroup
func main() {
wg.Add(2)
go func() {
defer wg.Done()
if err := kafka.NewProduct(); err != nil {
log.Println("kafka生产者运行失败")
return
}
}()
go func() {
defer wg.Done()
if err := kafka.Consume(); err != nil {
log.Println("kafka生产者运行失败")
return
}
}()
wg.Wait()
fmt.Println("运行结束")
}
更多推荐
已为社区贡献2条内容
所有评论(0)