准备

我们准备起码三个实例,你可以在一台设备上操作,但是起码要有三个实例。我这边准备了三台设备,分别为192.168.5.1,192.168.5.2,192.168.5.3。然后是kafka的包和zookeeper的包,kafka和zookeeper的版本要适配。我这里准备的是kafka_2.12-2.4.0.tgz和apache-zookeeper-3.5.8-bin.tar.gz。

zookeeper集群的搭建

kafka的运行依赖zookeeper,考虑到高可用的能力,所以需要搭建一个zookeeper的集群。
分别在三台设备上传到zookeeper的安装包,解压后,复制zookeeper目录下conf下的zoo_sample.cfg为zoo.cfg,编辑zoo.cfg。

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# 此处一定要修改,修改为合适的路径,如果是同一台设备的不同实例,也要分开
dataDir=/home/wjx/zkData
# the port at which the clients will connect
# 如果需要修改zookeeper的默认2181端口,修改这个
clientPort=2181
# 增加这个配置,其中的2888是kafka集群之间的通讯和数据同步端口,3888是集群节点leader的选举通信端口。
server.1=192.168.5.1:2888:3888
server.2=192.168.5.2:2888:3888
server.3=192.168.5.3:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

保存退出后,启动zookeeper,启动脚本在zookeeper的bin目录下:

bin/zkServer.sh start conf/zoo.cfg &

执行命令后,zookeeper会在后台进行运行,同理修改剩下的两台设备中zookeeper的配置文件并启动,zookeeper集群搭建完毕。

kafka集群的搭建

分别在三台设备上传到kafka的安装包,解压后,修改kafka目录下config下的server.properties。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# kafka的通讯器ID,集群中的每一台设备都要不一样
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# 一定要修改这个配置,指定kafka的日志存放,同一台设备的不同实例也要不一样,否则启动会报错,提示日志目录不为空
log.dirs=/home/wjx/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 配置zookeeper的集群,格式为 IP:PORT ,有多台的用,分隔
zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

# 如果要修改kafka的通讯端口,可以加一个配置,这里只是示意,本案例还是用的默认端口9092。
port=19092

保存退出后,启动kakfa,启动脚本在kafka的bin目录下,启动命令:

bin/kafka-server-start.sh -daemon config/server.properties 

同理,剩下的两台设备的kafka都修改配置文件启动完成后,kafka集群就搭建完成。如果不需要用户认证和topic的读写权限设置,kafka已经可以使用。


如果要修改kafka的默认堆内存,防止数据量大的情况下频繁GC,需要修改bin/kafka-server-start.sh这个启动脚本

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
	exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
# 就是这个地方的1G,根据实际情况改成需要的堆内存大小
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

用户的设置和权限的认证

还是用刚才搭建好的kafka集群,现在我们加上用户和读写权限的设置。我们现在要创建三个用户,分别是管理用户admin,生产者writer和消费者reader。我们随便登录一台搭建了kafka设备,进入kafka的目录,开始创建三个用户。

创建管理用户admin,密码是admin

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

提示完成用户的创建。

创建生产者writer,密码是writerpwd

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writerpwd],SCRAM-SHA-512=[password=writerpwd]' --entity-type users --entity-name writer

提示完成用户的创建。

创建消费者reader,密码是readerpwd

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=readerpwd],SCRAM-SHA-512=[password=readerpwd]' --entity-type users --entity-name reader

提示完成用户的创建。

我们可以用以下命令查看用户信息确定是否创建成功,比如查看writer用户信息:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --describe --entity-type users --entity-name writer

接下来我们要配置kafka通讯器,在kafka的config目录下创建一个kafka-broker-jaas.conf文件,文件内容如下:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};

其中的用户名密码就是我们配置的管理员,这个用户接着还要用到。注意:每一行的最后不要有空格,否则会有错误!(本人的血泪教训)

接下来继续修改config目录下的server.properties,在文件的最后加上:

# 启用ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置admin为超级用户
super.users=User:admin
# 启用SCRAM机制,采用SCRAM-SHA-512算法
sasl.enabled.mechanisms=SCRAM-SHA-512
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT,ip改成实际的本机ip
listeners=SASL_PLAINTEXT://192.168.5.1:9092
# 配置advertised.listeners,ip改成实际的本机ip
advertised.listeners=SASL_PLAINTEXT://192.168.5.1:9092
# 禁止自动创建topic
auto.create.topics.enable=false

然后我们重启kafka,这次启动要带上新加的kafka-broker-jaas.conf文件

KAFKA_OPTS=-Djava.security.auth.login.config=xxx/config/kafka-broker-jaas.conf bin/kafka-server-start.sh config/server.properties &

剩下两台也是重复以上除了创建用户的操作,最后重启。这样我们的kafka就有了用户和权限的限制。接着我们创建一个kafka的单分区单副本topic用于测试:

bin/kafka-topics.sh --create --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --topic test --partitions 1 --replication-factor 1

接下来我们用脚本来演示数据的生产和消费,同时配置用户的读写权限。

首先我们尝试向test这个topic发送数据

bin/kafka-console-producer.sh --broker-list 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test

输入任意字符发现会提示报错,ErrorLoggingCallback表示登录失败。

接着我们创建一个生产者的配置文件,然后用writer用户发送数据。配置文件producer.conf内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writerpwd";

执行生产者的脚本:

bin/kafka-console-producer.sh --broker-list 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --producer.config xx/producer.conf

输入任意字符会发现新的报错,此时就是因为权限的缘故了。因为writer用户是没有对test这个topic的写权限的,我们给它加上:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:writer --operation Write --topic test

执行后会打印操作的结果,以及当前这个topic的用户权限情况。

我们再次执行生产者的脚本:

bin/kafka-console-producer.sh --broker-list 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --producer.config xx/producer.conf

输入任意字符后会发现没有报错,说明发送成功!

同理的操作,我们需要对reader用户进行读topic的权限配置,先创建一个consumer.conf的文件:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="readerpwd";

添加test的读权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:reader --operation Read --topic test

我们执行消费者脚本尝试读取test里面的数据:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --from-beginning --consumer.config xxx/consumer.conf --group test-group

发现报错,提示我们没有访问用户组的权限,所以我们加一下权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181  --add --allow-principal User:reader --operation Read --group test-group

再次执行消费者脚本读取test里面的数据:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --from-beginning --consumer.config xxx/consumer.conf --group test-group

可以看到数据读取成功。

至此,我们生产和消费数据的kafka权限就配置完成了。

zookeeper的ACL登录认证

基本信息

zookeeper的bin目录下有zkCli.sh可以直接连接zookeeper,可以对zk的节点进行任何操作,这在生产环境中是及其危险的,所以需要对zk进行目录的权限限定

通过zkCli.sh可以登录指定的zk

./bin/zkCli.sh 192.168.5.1:2181

通过命令查看节点的权限

[zk: localhost:2181(CONNECTED) 0] getAcl /brokers
'world,'anyone
: cdrwa

可以看到/brokers这个节点是world:anyone的用户范围,这个也就是所有人可以操作,后面的cdrwa就是可以的操作,c-create(创建),d-delete(删除),r-read(读取),w-write(写入),a-auth(设置权限)

如果是获取一个无权限的节点信息则是如下:

[zk: localhost:2181(CONNECTED) 4] getAcl /
Authentication is not valid : /

设置权限

如果要设置权限,操作步骤如下:

  1. 登录用户,账号usera,密码userpwd
addauth digest usera:userpwd

这个登录准确也不叫登录,而是一种用户信息的创建,这个用户不需要存在,可以调用多次来同时登录多个用户

  1. 设置权限
setAcl / auth:usera:userpwd:cdrwa

这个命令就是将根目录/设置用户账号和密码,以及权限

  1. 查看权限
getAcl /

这个命令就是查看目录的权限

  1. 清除目录权限
setAcl / world:anyone:crdwa

清除目录权限的命令和设置权限的命令类似,但是没有用户前面的auth

忘记用户密码

以上都是设置权限的方法,但是有一个问题,就是设置节点的权限后忘了用户的账号密码,导致别的用户都无法操作这个节点。所以需要一个方法重置权限。这个方法就是开启一个超级账号去操作,超级账号没有权限限制,所以操作完记得关闭。

  1. 创建超级用户 useradmin 密码 userpwd
    设备上输入以下命令
echo -n useradmin:userpwd | openssl dgst -binary -sha1 | openssl base64

回显会得到密码的密文4pOzGFD5H++TMb0bB7hp2AD5+6U=,记录一下

  1. 修改zkServer.sh,即zk的启动脚本,找到脚本中的start方法部分,在指定参数的位置加上
"-Dzookeeper.DigestAuthenticationProvider.superDigest=useradmin:4pOzGFD5H++TMb0bB7hp2AD5+6U=" 
  1. 重启zk
  2. 再次用zkCli.sh连接zk,然后用登录命令登录:
addauth digest useradmin:userpwd

这个时候就会发现可以任意操作任何节点,可以用重置节点权限的命令重置节点,然后重新设置权限

5.另外的方法

如果是单纯忘记了节点的用户是谁,账号密码对应的还是记得的,可以通过暂时关闭acl验证来查看权限用户是谁。为了安全,使用完后记得删除或者注释配置,然后重启zk。

先修改zoo.conf配置文件,在最底下加上:

skipACL=yes

然后重启zk,用zkCli.sh连接zk,就能发现任何节点的权限信息都可以查看了,但是不能修改权限。

kafka脚本使用

  1. 查看topic消费情况

注意:如果没有权限,那么该命令最后的–command-config admin.conf不需要配置

bin/kafka-consumer-groups.sh  --bootstrap-server 192.168.5.1:9092,192.168.5.2:9092:192.168.5.3:9092 --describe --group test-group --command-config admin.conf

admin.conf的内容为:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

PS:因为我们在kafka中配置了超级用户是admin,所以这里一定要配置admin,用其他的用户会因为权限的问题而失败

整理文档:

启动zookeeper

bin/zkServer.sh start conf/zoo.cfg &

启动kafka

KAFKA_OPTS=-Djava.security.auth.login.config=xxx/config/kafka-broker-jaas.conf bin/kafka-server-start.sh config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --topic test --partitions 1 --replication-factor 1

新增生产者:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writerpwd],SCRAM-SHA-512=[password=writerpwd]' --entity-type users --entity-name writer

设置写权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:writer --operation Write --topic test

新增消费者:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=readerpwd],SCRAM-SHA-512=[password=readerpwd]' --entity-type users --entity-name reader

设置读权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:reader --operation Read --topic test

设置消费组权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181  --add --allow-principal User:reader --operation Read --group test-group

删除用户:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer

Java代码demo:

生产者

package com.wangjx.message.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @program: swallow
 * @description:
 * @author: wangjiaxing
 * @created: 2020/10/30 11:43
 */
public class KafkaSender {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列化方式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value
        //此处设置kafka的消费者配置文件
        System.setProperty("java.security.auth.login.config", "./kafka/kafka-client-jaas.conf");
        //如果system这个配置已经被占用,可以使用下面注释的方式进行设置
        //properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"writer\" password=\"writerpwd\";");
        //安全协议
        properties.put("security.protocol", "SASL_PLAINTEXT");
        //加密方式
        properties.put("sasl.mechanism", "SCRAM-SHA-512");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "10");
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "50");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, 30);
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 20000);
        properties.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 20000);
        KafkaProducer<String, String> producer = new KafkaProducer(properties);
        while (true) {
            String topic = "test";
            String k = UUID.randomUUID().toString();
            Long v = System.currentTimeMillis();
            producer.send(new ProducerRecord(topic, k, Long.toString(v)));
            System.out.println("send to topic > "+topic+" : "+k+" | "+v+"");
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

消费者

package com.wangjx.message.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @program: swallow
 * @description:
 * @author: wangjiaxing
 * @created: 2020/10/30 11:43
 */
public class KafkaReceiver {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//key 反序列化方式
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value
        //此处设置kafka的消费者配置文件
        System.setProperty("java.security.auth.login.config", "./kafka/kafka-client-jaas.conf");
        //如果system这个配置已经被占用,可以使用下面注释的方式进行设置
        //properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"reader\" password=\"readerpwd\";");
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "SCRAM-SHA-512");
        properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 20000);
        properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 20000);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        List<String> topics = new ArrayList<>();
        String topic = "test";
        topics.add(topic);
        consumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
            // 消息处理
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("receive message from topic "+topic+", key = "+record.key()+", offset = "+record.offset()+", message = "+record.value());
            }
        }
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐