go kafka入门教程,docker kafka本地环境搭建

在本文中,您将学习:

  • 使用docker启动本地环境kafka
  • 使用go-kafka链接本地kafka进行消息生产和消费

获取本文代码

https://github.com/sl40/go-babysit

git clone https://github.com/sl40/go-babysit.git

启动本地环境kafka

要在开发设置中使用 Kafka,请创建以下docker-compose.yml文件

docker-compose.yml

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '9093:9093'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

要部署kafka,请在docker-compose.yml文件所在的目录中运行以下命令:

docker-compose up -d

使用脚本kafka脚本测试本地环境

你可以使用kafka客户端进行测试,也可以跳过这个环境直接使用go进行测试

kafka客户端下可以查看官方文档 https://kafka.apache.org/downloads

打开2个命令行窗口,分别运行下面两个命令:

启动生产者

kafka-console-producer.sh --broker-list kafka:9093 --topic test

启动消费者

kafka-console-consumer.sh --bootstrap-server kafka:9093 --topic test

现在可以在生产者窗口输入

one
twe
three

查看消费者窗口,会显示生产者发送的消息。恭喜你,本地环境搭建完成!

下面将演示go如何链接kafka并且生产和消费消息

Go生产者代码

package main
import (
	"context"
	"fmt"
	kafka "github.com/segmentio/kafka-go"
	"log"
)

func main() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{"localhost:9093"},
		GroupID:   "consumer-group-id",
		Topic:     "test",
		MinBytes:  10e3, // 10KB
		MaxBytes:  10e6, // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}

	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}

}

Go消费者代码

package main
import (
	"context"
	kafka "github.com/segmentio/kafka-go"
	"log"
)

func main() {
	// make a writer that produces to test, using the least-bytes distribution
	w := &kafka.Writer{
		Addr:     kafka.TCP("localhost:9093"),
		Topic:   "test",
		Balancer: &kafka.LeastBytes{},
	}

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("Two!"),
		},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

}

运行Go代码

在kafka目录下使用go mod下载依赖

go mod download

在consumer目录下运行消费者

go run mian.go
message at topic/partition/offset test/0/18: Key-A = Hello World!
message at topic/partition/offset test/0/19: Key-B = One!
message at topic/partition/offset test/0/20: Key-C = Two!

在producer目录下运行生产者

go run mian.go

消费者窗口会打印结果,恭喜你使用go完成了kafka最基础的操作!
alt 属性文本

what’s more

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐