一:核心概念

kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点。

kafka中涉及的名词:

  • 消息记录(record): 由一个key,一个value和一个时间戳构成,消息最终存储在主题下的分区中, 记录在生产者中称为生产者记录(ProducerRecord), 在消费者中称为消费者记录(ConsumerRecord),Kafka集群保持所有的消息,直到它们过期, 无论消息是否被消费了,在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
  • 生产者(producer): 生产者用于发布(send)消息
  • 消费者(consumer): 消费者用于订阅(subscribe)消息
  • 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组, 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
  • 主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
  • 分区(partition): 消息的一种物理分组, 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的id,称之为偏移量(offset),在每个分区中偏移量都是唯一的。每个分区对应一个逻辑log,有多个segment组成。
  • 偏移量(offset): 分区中的每个消息都一个一个唯一id,称之为偏移量,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动控制一条消息是否已经被成功消费)
  • 代理(broker): 一台kafka服务器称之为一个broker
  • 副本(replica):副本只是一个分区(partition)的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
  • 领导者(leader):Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader, producer 和 consumer 只跟 leader 交互
  • 追随者(follower):跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。replica 中的一个角色,从 leader 中复制数据。
  • zookeeper:Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。ZooKeeper用于管理和协调Kafka代理

kafka功能

  • 发布订阅:生产者(producer)生产消息(数据流), 将消息发送到到kafka指定的主题队列(topic)中,也可以发送到topic中的指定分区(partition)中,消费者(consumer)从kafka的指定队列中获取消息,然后来处理消息。
  • 流处理(Stream Process): 将输入topic转换数据流到输出topic
  • 连接器(Connector) : 将数据从应用程序(源系统)中导入到kafka,或者从kafka导出数据到应用程序(宿主系统sink system), 例如:将文件中的数据导入到kafka,从kafka中将数据导出到文件中

kafka中的消息模型

  • 队列:同名的消费者组员瓜分消息
  • 发布订阅:广播消息给多个消费者组(不同名)

 

生产者(producer)将消息记录(record)发送到kafka中的主题中(topic), 一个主题可以有多个分区(partition), 消息最终存储在分区中,消费者(consumer)最终从主题的分区中获取消息。 
这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

二:安装与启动

一: Mac版安装

brew install kafka

安装kafka是需要依赖于zookeeper的,所以安装kafka的时候也会包含zookeeper 
kafka的安装目录:/usr/local/Cellar/kafka 
kafka的配置文件目录:/usr/local/etc/kafka 
kafka服务的配置文件:/usr/local/etc/kafka/server.properties 
zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties

# server.properties中的重要配置

broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/var/lib/kafka-logs
# zookeeper.properties

dataDir=/usr/local/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0 

二: 启动zookeeper

# 新起一个终端启动zookeeper
cd /usr/local/Cellar/kafka/1.0.0
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

三: 启动kafka

# 新起一个终端启动zookeeper,注意启动kafka之前先启动zookeeper
cd /usr/local/Cellar/kafka/1.0.0
./bin/kafka-server-start /usr/local/etc/kafka/server.properties

四:创建topic

# 新起一个终端来创建主题
cd /usr/local/Cellar/kafka/1.0.0

## 创建一个名为“test”的主题,该主题有1个分区
./bin/kafka-topics --create 
    --zookeeper localhost:2181 
    --partitions 1 
    --topic test

五:查看topic

// 创建成功可以通过 list 列举所有的主题
./bin/kafka-topics --list --zookeeper localhost:2181

// 查看某个主题的信息
./bin/kafka-topics --describe --zookeeper localhost:2181 --topic <name>

六:发送消息

# 新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器
  > ./bin/kafka-console-producer --broker-list localhost:9092 --topic test 
  This is a message
  This is another message

七:消费消息(接收消息)

# 新起一个终端作为消费者,接收消息
cd /usr/local/Cellar/kafka/1.0.0
> ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

八:在生产者发送消息 
在步骤六中新起的终端属于一条消息(任意字符),输入完回车就算一条消息,可以看到在步骤7中的消费者端就会显示刚才输入的消息

三:Go实现消息接收,发送

1). 准备

  1. 安装依赖库sarama
    go get github.com/Shopify/sarama
    该库要求kafka版本在0.8及以上,支持kafka定义的high-level API和low-level API,但不支持常用的consumer自动rebalance和offset追踪,所以一般得结合cluster版本使用。
  2. sarama-cluster依赖库
    go get github.com/bsm/sarama-cluster
    需要kafka 0.9及以上版本
  3. 代码示例来自官网(本地已测试),可到官网查看更多信息。

2). 生产者

1. 同步消息模式

import (
    "github.com/Shopify/sarama"
    "time"
    "log"
    "fmt"
    "os"
    "os/signal"
    "sync"
)

var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"}

func main()  {
    syncProducer(Address)
    //asyncProducer1(Address)
}

//同步消息模式
func syncProducer(address []string)  {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(address, config)
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }
    defer p.Close()
    topic := "test"
    srcValue := "sync: this is a message. index=%d"
    for i:=0; i<10; i++ {
        value := fmt.Sprintf(srcValue, i)
        msg := &sarama.ProducerMessage{
            Topic:topic,
            Value:sarama.ByteEncoder(value),
        }
        part, offset, err := p.SendMessage(msg)
        if err != nil {
            log.Printf("send message(%s) err=%s \n", value, err)
        }else {
            fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset)
        }
        time.Sleep(2*time.Second)
    }
}

2.异步消息

func SaramaProducer()  {

    config := sarama.NewConfig()
    //等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    //随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
    config.Version = sarama.V0_10_0_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if e != nil {
        fmt.Println(e)
        return
    }
    defer producer.AsyncClose()

    //循环判断哪个通道发送过来数据.
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for{
            select {
            case  <-p.Successes():
                //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(producer)

    var value string
    for i:=0;;i++ {
        time.Sleep(500*time.Millisecond)
        time11:=time.Now()
        value = "this is a message 0606 "+time11.Format("15:04:05")

        // 发送的消息,主题。 
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
        msg := &sarama.ProducerMessage{
            Topic: "0606_test",
        }

        //将字符串转化为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)

        //使用通道发送
        producer.Input() <- msg
    }
}

 

3)消费者

    集群模实现

func main()  {
    topic := []string{"test"}
    var wg = &sync.WaitGroup{}
    wg.Add(2)
    //广播式消费:消费者1
    go clusterConsumer(wg, Address, topic, "group-1")
    //广播式消费:消费者2
    go clusterConsumer(wg, Address, topic, "group-2")

    wg.Wait()
}

// 支持brokers cluster的消费者
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string)  {
    defer wg.Done()
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.Initial = sarama.OffsetNewest

    // init consumer
    consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
    if err != nil {
        log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
        return
    }
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // consume errors
    go func() {
        for err := range consumer.Errors() {
            log.Printf("%s:Error: %s\n", groupId, err.Error())
        }
    }()

    // consume notifications
    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
        }
    }()

    // consume messages, watch signals
    var successes int
    Loop:
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "")  // mark message as processed
                successes++
            }
        case <-signals:
            break Loop
        }
    }
    fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}

四. 参考资料

  1. Go连接Kafka
  2. kafka的go版本api使用
  3. Godoc - sarama

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐