1.Windows环境下安装zookeeper和kafka

Windows环境下安装zookeeper和kafka

2.运行zookeeper

3.运行kaka

4.生产者

import (
	"encoding/json"
	"github.com/Shopify/sarama"
	"strconv"
)

type Product struct {
	Id    int
	Name  string
	Title string
}

func NewProduct() error {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 10
	config.Producer.Return.Successes = true
	brokers := []string{"localhost:9092"}
	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		return err
	}
	p := &Product{
		Id: 1,
		Name: "钻戒",
		Title: "那戒指的质地似乎是钻石制成的吧,闪闪发光又不失内敛,清雅又不失高贵,阳光洒下来,发出淡淡的光,和淡淡的清香,有着像是通了灵般的仙气",
	}

	key := sarama.StringEncoder(strconv.Itoa(p.Id))
	value, err := json.Marshal(p)
	if err != nil {
		return err
	}
	msg := &sarama.ProducerMessage{
		Topic: "new-products",
		Key:   key,
		Value: sarama.ByteEncoder(value),
	}
	producer.Input() <- msg
	return nil
}

5.消费者

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"log"
)

func Consume() error {
	// 初始化 Kafka 消费者
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 10
	config.Producer.Return.Successes = true
	brokers := []string{"localhost:9092"}
	consumer, err := sarama.NewConsumer(brokers, config)
	partitionConsumer, err := consumer.ConsumePartition("newProduct", 0, sarama.OffsetNewest)
	if err != nil {
		log.Printf("Error consuming partition: %v", err)
		return err
	}
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			var product Product
			err = json.Unmarshal(msg.Value, &product)
			if err != nil {
				log.Printf("Error unmarshaling product: %v", err)
				return err
			} else {
				fmt.Printf("New product: %+v\n", product)
			}
		case err = <-partitionConsumer.Errors():
			log.Printf("Error consuming message: %v", err)
			return err
		}
	}
}

6.main函数

import (
	"fmt"
	"golang_test/kafka_test/kafka"
	"log"
	"sync"
)

var wg sync.WaitGroup

func main() {
	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := kafka.NewProduct(); err != nil {
			log.Println("kafka生产者运行失败")
			return
		}
	}()
	go func() {
		defer wg.Done()
		if err := kafka.Consume(); err != nil {
			log.Println("kafka生产者运行失败")
			return
		}
	}()
	wg.Wait()
	fmt.Println("运行结束")
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐