规划

192.168.56.11  k1
192.168.56.12  k2
192.168.56.13  k3
192.168.56.21  pg11n1
192.168.56.22  pg11n2

主机 k1、k2、k3 上安装 zookeeper、kafka server、kafka connector

创建 kafka 用户

所有节点创建用户,保持 id 一致

# groupadd -g 600 kafka;
useradd -u 600 -g kafka -G kafka kafka;

# passwd kafka

# su - kafka

$ mkdir -p /home/kafka/data/logs

安装更新 java 1.8

# yum install java-1.8.0-openjdk*
# /usr/sbin/update-alternatives --config java;

下载安装 kafka

所有节点执行

# cd /usr/local/
# wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
# tar -zxvf ./kafka_2.13-2.8.1.tgz
# chown -R kafka:kafka ./kafka_2.13-2.8.1
# ln -s ./kafka_2.13-2.8.1 kafka-2.8

配置 kafka

所有节点执行,注意 broker.id 的区别 (k1 节点对应1,k2 节点对应2,k3 节点对应3)

# su - kafka
$ cd /usr/local/kafka-2.8/config
$ cp server.properties server.properties.bak
$ > server.properties
$ vi server.properties

broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/kafka/data/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

启动 kafka

# su - kafka
$ /usr/local/kafka-2.8/bin/kafka-server-start.sh -daemon /usr/local/kafka-2.8/config/server.properties
$ jps
12353 Kafka
12422 Jps

$ ps -ef|head -1;ps -ef|grep -i kafka
UID        PID  PPID  C STIME TTY          TIME CMD
root     11900 11257  0 18:32 pts/1    00:00:00 su - kafka
kafka    11901 11900  0 18:32 pts/1    00:00:00 -bash
kafka    12353     1 10 18:33 pts/1    00:00:08 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/usr/local/kafka-2.8/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka-2.8/bin/../logs -Dlog4j.configuration=file:/usr/local/kafk-2.8/bin/../config/log4j.properties -cp /usr/local/kafka-2.8/bin/../libs/activation-1.1.1.jar:/usr/local/kafka-2.8/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka-2.8/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka-2.8/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka-2.8/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka-2.8/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-api-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-basic-auth-extension-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-file-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-json-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-mirror-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-mirror-client-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-runtime-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/connect-transforms-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka-2.8/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka-2.8/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka-2.8/bin/../libs/jackson-annotations-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-core-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-databind-2.10.5.1.jar:/usr/local/kafka-2.8/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-module-paranamer-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/usr/local/kafka-2.8/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka-2.8/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka-2.8/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka-2.8/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka-2.8/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka-2.8/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka-2.8/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka-2.8/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka-2.8/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka-2.8/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka-2.8/bin/../libs/jersey-client-2.34.jar:/usr/local/kafk-2.8/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka-2.8/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka-2.8/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka-2.8/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka-2.8/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka-2.8/bin/../libs/jetty-client-9.4.43.v20210629.jar:/usr/localkafka-2.8/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-http-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-io-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-security-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-server-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-util-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/usr/local/kafka-2.8/bin/../libs/jline-3.12.1.jar:/usr/local/kafka-2.8/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka-2.8/bin/../libs/kafka_2.13-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka_2.13-2.8.1-sources.jar:/usr/local/kafka-2.8/bin/../libs/kafka-clients-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-log4j-appender-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-metadata-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-raft-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-shell-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-streams-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-streams-examples-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-streams-scala_2.13-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-streams-test-utils-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/kafka-tools-2.8.1.jar:/usr/local/kafka-2.8/bin/../libs/log4j-1.2.17.jar:/usr/local/kafka-2.8/bin/../libs/lz4-java-1.7.1.jar:/usr/local/kafka-2.8/bin/../libs/maven-artifact-3.8.1.jar:/usr/local/kafka-2.8/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka-2.8/bin/../libs/netty-buffer-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-codec-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-common-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-handler-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-resolver-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-transport-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-transport-native-epoll-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/netty-transport-native-unix-common-4.1.62.Final.jar:/usr/local/kafka-2.8/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka-2.8/bin/../libs/paranamer-2.8.jar:/usr/local/kafka-2.8/bin/../libs/plexus-utils-3.2.1.jar:/usr/local/kafka-2.8/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka-2.8/bin/../libs/rocksdbjni-5.18.4.jar:/usr/local/kafka-2.8/bin/../libs/scala-collection-compat_2.13-2.3.0.jar:/usr/local/kafka-2.8/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/usr/local/kafka-2.8/bin/../libs/scala-library-2.13.5.jar:/usr/local/kafka-2.8/bin/../libs/scala-logging_2.13-3.9.2.jar:/usr/local/kafka-2.8/bin/../libs/scala-reflect-2.13.5.jar:/usr/local/kafka-2.8/bin/../libs/slf4j-api-1.7.30.jar:/usr/local/kafka-2.8/bin/../libs/slf4j-log4j12-1.7.30.jar:/usr/local/kafka-2.8/bin/../libs/snappy-java-1.1.8.1.jar:/usr/local/kafka-2.8/bin/../libs/zookeeper-3.5.9.jar:/usr/local/kafka-2.8/bin/../libs/zookeeper-jute-3.5.9.jar:/usr/local/kafka-2.8/bin/../libs/zstd-jni-1.4.9-1.jar kafka.Kafka /usr/local/kafka-2.8/config/server.properties
kafka    12467 11901  0 18:34 pts/1    00:00:00 ps -ef
kafka    12468 11901  0 18:34 pts/1    00:00:00 grep --color=auto -i kafka

创建 topic

$ /usr/local/kafka-2.8/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181 --replication-factor 2 --partitions 1 --topic test1

查看 topic

$ /usr/local/kafka-2.8/bin/kafka-topics.sh --list --zookeeper 192.168.56.11:2181

$ /usr/local/kafka-2.8/bin/kafka-topics.sh --describe --zookeeper 192.168.56.11:2181 --topic test1

停止 kafka

# su - kafka
$ /usr/local/kafka-2.8/bin/kafka-server-stop.sh /usr/local/kafka-2.8/config/server.properties
$ 

参考:
https://blog.csdn.net/ctypyb2002/article/details/113748515
http://kafka.apache.org/downloads
https://kafka.apache.org/28/documentation.html
https://www.cnblogs.com/luotianshuai/p/5206662.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐