saram是一个使用纯go语言编写的kafka库。

sarama安装

go get github.com/Shopify/sarama

生产消息(异步)

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "log"
    "time"
)

var address = []string{"192.168.68.142:9092"}

func main() {
    // 配置
    config := sarama.NewConfig()
    // 等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    // 版本
    config.Version = sarama.V0_10_2_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, err := sarama.NewAsyncProducer(address, config)
    if err != nil {
        log.Printf("new async producer error: %s \n", err.Error())
        return
    }
    defer producer.AsyncClose()

    // 循环判断哪个通道发送过来数据
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for {
            select {
            case suc := <-p.Successes():
                fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("error: ", fail.Error())
            }
        }
    }(producer)

    var value string
    for i := 0; ; i++ {
        // 每隔两秒发送一条消息
        time.Sleep(2 * time.Second)

        // 创建消息
        value = fmt.Sprintf("async message, index = %d", i)
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系
        msg := &sarama.ProducerMessage{
            Topic: "web_log",
            Value: sarama.ByteEncoder(value),
        }

        // 使用通道发送
        producer.Input() <- msg
    }
}

消费消息

package main

import (
	"context"
	"fmt"

	"github.com/Shopify/sarama"
)

type AAAConsumerGroupHandler struct{}

func (AAAConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}
func (AAAConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

// 这个方法用来消费消息的
func (h AAAConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// 获取消息
	for msg := range claim.Messages() {
		fmt.Printf("topic:%q partition:%d offset:%d value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
		// 将消息标记为已使用
		sess.MarkMessage(msg, "")
	}
	return nil
}

// 接收数据
func main() {
	// 先初始化 kafka
	config := sarama.NewConfig()
	// Version 必须大于等于  V0_10_2_0
	config.Version = sarama.V0_10_2_1
	config.Consumer.Return.Errors = true
	fmt.Println("start connect kafka")
	// 开始连接kafka服务器
	group, err := sarama.NewConsumerGroup([]string{"192.168.68.142:9092"}, "AAA-group", config)

	if err != nil {
		fmt.Println("连接kafka失败:", err)
		return
	}
	// 检查错误
	go func() {
		for err := range group.Errors() {
			fmt.Println("分组错误 : ", err)
		}
	}()

	ctx := context.Background()
	fmt.Println("开始获取消息")
	// for 是应对 consumer rebalance
	for {
		// 需要监听的主题
		topics := []string{"web_log"}
		handler := AAAConsumerGroupHandler{}
		// 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里
		err := group.Consume(ctx, topics, handler)

		if err != nil {
			fmt.Println("消费失败; err : ", err)
			return
		}
	}

}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐