kafka自带zookeeper,所以不需要下载zookeeper

1、下载

wget http://mirrors.shu.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz

2、安装

tar -zxvf kafka_2.12-2.0.0.tgz

[root@log-system opt]# tar -zxvf kafka_2.12-2.0.0.tgz
[root@log-system opt]# cd kafka_2.12-2.0.0/
[root@log-system kafka_2.12-2.0.0]# ll
total 56
drwxr-xr-x. 3 root root  4096 Jul 24 22:20 bin
drwxr-xr-x. 2 root root  4096 Oct  8 17:01 config
drwxr-xr-x. 2 root root  4096 Oct  8 16:51 libs
-rw-r--r--. 1 root root 28824 Jul 24 22:17 LICENSE
drwxr-xr-x. 2 root root  4096 Oct  9 16:03 logs
-rw-r--r--. 1 root root   336 Jul 24 22:17 NOTICE
drwxr-xr-x. 2 root root    43 Jul 24 22:20 site-docs

3、配置

修改server.properties

vim config/server.properties

zookeeper.connect=localhost:2181

 

4、启动

[root@log-system kafka_2.12-2.0.0]# cd bin
[root@log-system bin]# ll
total 132
-rwxr-xr-x. 1 root root 1421 Jul 24 22:17 connect-distributed.sh
-rwxr-xr-x. 1 root root 1418 Jul 24 22:17 connect-standalone.sh
-rwxr-xr-x. 1 root root  861 Jul 24 22:17 kafka-acls.sh
-rwxr-xr-x. 1 root root  873 Jul 24 22:17 kafka-broker-api-versions.sh
-rwxr-xr-x. 1 root root  864 Jul 24 22:17 kafka-configs.sh
-rwxr-xr-x. 1 root root  945 Jul 24 22:17 kafka-console-consumer.sh
-rwxr-xr-x. 1 root root  944 Jul 24 22:17 kafka-console-producer.sh
-rwxr-xr-x. 1 root root  871 Jul 24 22:17 kafka-consumer-groups.sh
-rwxr-xr-x. 1 root root  948 Jul 24 22:17 kafka-consumer-perf-test.sh
-rwxr-xr-x. 1 root root  871 Jul 24 22:17 kafka-delegation-tokens.sh
-rwxr-xr-x. 1 root root  869 Jul 24 22:17 kafka-delete-records.sh
-rwxr-xr-x. 1 root root  866 Jul 24 22:17 kafka-dump-log.sh
-rwxr-xr-x. 1 root root  863 Jul 24 22:17 kafka-log-dirs.sh
-rwxr-xr-x. 1 root root  862 Jul 24 22:17 kafka-mirror-maker.sh
-rwxr-xr-x. 1 root root  886 Jul 24 22:17 kafka-preferred-replica-election.sh
-rwxr-xr-x. 1 root root  959 Jul 24 22:17 kafka-producer-perf-test.sh
-rwxr-xr-x. 1 root root  874 Jul 24 22:17 kafka-reassign-partitions.sh
-rwxr-xr-x. 1 root root  874 Jul 24 22:17 kafka-replica-verification.sh
-rwxr-xr-x. 1 root root 9290 Jul 24 22:17 kafka-run-class.sh
-rwxr-xr-x. 1 root root 1376 Jul 24 22:17 kafka-server-start.sh
-rwxr-xr-x. 1 root root  997 Jul 24 22:17 kafka-server-stop.sh
-rwxr-xr-x. 1 root root  945 Jul 24 22:17 kafka-streams-application-reset.sh
-rwxr-xr-x. 1 root root  863 Jul 24 22:17 kafka-topics.sh
-rwxr-xr-x. 1 root root  958 Jul 24 22:17 kafka-verifiable-consumer.sh
-rwxr-xr-x. 1 root root  958 Jul 24 22:17 kafka-verifiable-producer.sh
-rwxr-xr-x. 1 root root 1722 Jul 24 22:17 trogdor.sh
drwxr-xr-x. 2 root root 4096 Jul 24 22:17 windows
-rwxr-xr-x. 1 root root  867 Jul 24 22:17 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 Jul 24 22:17 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 1001 Jul 24 22:17 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root  968 Jul 24 22:17 zookeeper-shell.sh

先启动zk

[root@log-system bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

再启动kafka

[root@log-system bin]# ./kafka-server-start.sh -daemon ../config/server.properties

查看启动情况

[root@log-system bin]# netstat -tunlp|egrep "(2181|9092)"
tcp6       0      0 :::9092                 :::*                    LISTEN      15122/java          
tcp6       0      0 :::2181                 :::*                    LISTEN      14705/java

其中9092 是kafka端口,2181是zookeeper端口

 

kafka默认是没有web界面的,给kafka新增web界面

可以参照博客:https://blog.csdn.net/lsshlsw/article/details/47342821

 

kafka使用

1、创建一个消息订阅队列

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-pear

zookeeper:zookeeper host 和ip

replication-factor:副本数

partitions :分区数

topic :消息订阅名称

 

2、发布消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-pear

在界面输入

[root@log-system kafka_2.12-2.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-pear
>您好
>大家好
>

 

3、消费消息

[root@log-system bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-pear --from-beginning
您好
大家好

通过监控界面可以看到创建的队列信息

 

 


 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐