kafka跟activemq,rocketmq类似,也是其中一种消息中间件。

Step1:下载kafka包

https://kafka.apache.org/downloads 下载 kafka_2.11-1.1.0.tgz
>tar -zxvf kafka_2.11-1.1.0.tgz
>cd kafka_2.11-1.1.0

Step2:启动kafka

kafka需要用到zookeeper,如果没有zookeeper,可以使用kafka自带的单节点zookeeper,
zookeeper.properties:
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
kafka自带的zookeeper默认是2181端口。
启动自带zookeeper:
>bin/zookeeper-server-start.sh config/zookeeper.propertie
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

启动kafka
先看看kafka的配置文件server.properties几个主要的配置项
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
broker.id设置kafka broker的id,以区分不同的broker
num.network.threads为处理broker接收请求和发送响应的线程数
num.io.threads为处理请求,包含需要磁盘I/O操作时的线程数
log.dirs指明日志文件的路径
zookeeper.connect配置zookeeper的ip:port
zookeeper.connection.timeout.ms为zookeeper超时时重连的时间(毫秒)


如果要在其他机器调用kafka,则需要配置listeners:

listeners = PLAINTEXT://[your.host.name]:9092
启动
>bin/kafka-server-start.sh config/server.properties

Step3:创建topic

创建只有一个分区()和只有一个副本的topic test
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
通过list命令查看topic
>bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Step4:发送消息

kafka可以通过命令行客户端将消息发送给kafka集群,客户端可以通过一个文件或者标准输入作为消息输入来源,默认来说,一个换行就一条消息。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
first message
second message

Step5:启动一个客户端

kafka提供一个命令行客户端查询出topic的消息
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
first message
second message
所有命令行工具都有其他的选项参数,运行命令,如果没带参数的情况下,则会有提供命令的详细使用选项参数。

Step6:建立多broker集群

通过前面几个Step,我们已经建立了kafka,不过kafka集群只有一个节点broker的,下面,我们扩展到三个节点。
之前已经建立了一个broker,我们再建立2个broker即可,先来编辑2个broker的配置文件
server-1-9093.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1-9093
server-2-9094.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2-9094
broker.id是每个节点在集群中的唯一标识id。
启动两个节点:
>bin/kafka-server-start.sh config/server-1-9093.properties &
>bin/kafka-server-start.sh config/server-2-9094.properties &
INFO Awaiting socket connections on 0.0.0.0:9094. (kafka.network.Acceptor)
...
INFO Creating /brokers/ids/2 (is it secure? false) (kafka.zk.KafkaZkClient)
...
INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
上面是9094节点启动的日志,id=2的节点启动成功,并注册为zookeeper的目录/brokers/ids/2

所以集群的节点都注册到zookeeper的/brokers/ids目录下
[zk: 127.0.0.1:2181(CONNECTED) 13] ls /brokers/ids
[2, 1, 0]
比如brokers-id:0
[zk: 127.0.0.1:2181(CONNECTED) 15] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],
"jmx_port":-1,"host":"localhost","timestamp":"1529319636438","port":9092,"version":4}
创建有3个副本,一个分区的topic:my-replicated-topic
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
我们已经创建了一个kafka的集群,不过怎么知道集群的各个节点各自负责什么职责?可以通过“describe topics”命令得知:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
第一行是所有分区的总概况,此外额外第一行描述一个分区的信息,由于分区数为1,所以从第二行起只有一行。
"Partition" 第1个分区,这里是从0开始;
"Leader" 对指定的分区的所有读和写响应,都是该leader(id)处理,选哪个节点作为特定分区的Leader随机的;
"Replicas" 指定分区日志的副本的所有节点列表,这些节点并不区分它们是Leader还是存活的节点;
"Isr" 是“同步”副本的集合,"Isr"是副本的子集,它们当前是存活状态的并使其中1个作为Leader;

首先发布一些消息到topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
接着我们消费topic的这些消息
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
我们现在测试kafka集群的容错,当前集群是broker 1是Leader
ps aux | grep server-1-9093.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
kill -9 7564
我们通过手动停止Leader节点broker 1。

kafka会在存活状态的从节点中选一个节点将它切换到Leader,由于broker 1已经被停止,不再是存活状态,所以它不会在"Isr"的集合中。
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
我们看到broker 1停止后,"Replicas"的列表是不变的,但"Isr"的集合中不会再包含1只剩下2和0,所以新的Leader是从2,0的集合中随机选出来的。


https://kafka.apache.org/quickstart
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐