package main

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"strconv"
	"time"
)

//  go get github.com/Shopify/sarama

var (
	product sarama.SyncProducer
)

type BookInfo struct {
	Title string  `json:"title"`
	Price float32 `json:"price"`
}

func main() {
	if err := NewProduct([]string{"localhost:9092"}); err != nil {
		fmt.Println(err, "NewProduct")
		return
	}
	defer product.Close()
	for i := 0; i < 5000; i++ {
		data, err := json.Marshal(&BookInfo{
			Title: "t" + strconv.Itoa(i),
			Price: float32(i),
		})
		if err != nil {
			fmt.Println("Marshal", err)
			return
		}
		partition, offset, err := product.SendMessage(&sarama.ProducerMessage{
			//Key:       sarama.StringEncoder("audit"),
			Topic: "audit",
			Value: sarama.ByteEncoder(data),
			//Partition: 1,
		})
		if err != nil {
			fmt.Println("SendMessage:", err)
			return
		}
		fmt.Printf("partition=%d , offset=%d, i=%d\n", partition, offset, i)
		time.Sleep(2 * time.Second)
	}
}

func NewProduct(addrs []string) error {
	config := sarama.NewConfig()
	// 异步生产者不建议把Return的 Errors 和 Successes 都开启,一般开启 Errors 就行
	// 同步生产者的Return就必须都开启,因为会同步返回发送成功或者失败
	config.Producer.Return.Successes = true // 成功交付的消息将在success_channel返回
	//config.Producer.Partitioner = NewHashPartitioner
	//config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	client, err := sarama.NewClient(addrs, config)
	if err != nil {
		return err
	}
	product, err = sarama.NewSyncProducerFromClient(client)
	if err != nil {
		return err
	}

	return nil
}

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
)

func main() {
	k := NewKafka([]string{"localhost:9092"}, []string{"audit"})
	c := k.Connect()

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-sigterm:
		fmt.Println("terminating: via signal")
	}
	c()
}

type Kafka struct {
	brokers           []string
	topics            []string
	startOffset       int64
	version           string
	ready             chan struct{}
	group             string
	channelBufferSize int
	assignor          string
}

func NewKafka(brokers []string, topics []string) *Kafka {
	return &Kafka{
		brokers:           brokers,
		topics:            topics,
		group:             "grp1",
		channelBufferSize: 1000,
		ready:             make(chan struct{}),
		version:           "2.8.0",
		assignor:          "range",
	}
}

func (k *Kafka) Connect() func() {
	version, err := sarama.ParseKafkaVersion(k.version)
	if err != nil {
		log.Fatalf("Error parsing Kafka version: %v", err)
	}

	config := sarama.NewConfig()
	config.Version = version
	// 分区分配策略
	//switch assignor {
	//case "sticky":
	//	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	//case "roundrobin":
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	//case "range":
	//	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	//default:
	//	log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
	//}
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.ChannelBufferSize = k.channelBufferSize // channel长度

	// 创建client
	newClient, err := sarama.NewClient(k.brokers, config)
	if err != nil {
		log.Fatal(err)
	}
	// 获取所有的topic
	topics, err := newClient.Topics()
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("topics: ", topics)

	// 根据client创建consumerGroup
	client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
	if err != nil {
		log.Fatalf("Error creating consumer group client: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			fmt.Println("ff1")
			if err := client.Consume(ctx, k.topics, k); err != nil {
				// 当setup失败的时候,error会返回到这里
				fmt.Printf("Error from consumer: %v\n", err)
				return
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				log.Println(ctx.Err(), "::ctx")
				return
			}

			fmt.Println("ff2")
		}
	}()

	fmt.Println("Sarama consumer up and running!...")
	// 保证在系统退出时,通道里面的消息被消费
	return func() {
		fmt.Println("kafka close")
		cancel()
		close(k.ready)
		wg.Wait()
		if err = client.Close(); err != nil {
			fmt.Printf("Error closing client: %v\n", err)
		}
	}
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
	fmt.Println("setup")
	fmt.Println("session.Claims: ", session.Claims())
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	// 具体消费消息
	for message := range claim.Messages() {
		select {
		case _, ok := <-k.ready:
			if !ok {
				fmt.Println("!ok")
				return nil
			}
		default:
		}
		fmt.Printf("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]\n",
			message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
		// 更新位移
		time.Sleep(5 * time.Second)
		session.MarkMessage(message, "")
		fmt.Printf("-0-[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]\n",
			message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
	}
	return nil
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐