kafka的版本为2.12-2.8.0

(一)加密处理需求

  1. 只对kakfa外网端口进行加密处理。
  2. kafka的内部端口端口不需要加密处理。
  3. kafka的broker之间通讯不需要加密处理。
  4. zookeeper之间不需要加密处理。

(二)kafka配置

kafka中jaas.config配置

        新建kafka_server_jaas.conf文件,该文件为kafka服务使用。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="admin"
    password="admin123"
    user_admin="admin123"
    user_client="123456";
};

        serviceName:定义服务的名称

        username:定义内部连接使用的用户名

        password:定义内部连接使用的用户名对应的密码。

        user_admin:定义用户名和密码,根据上面配置来看,user_admin中admin为用户名,后面的admin123为密码。

        user_client:定义用户名和密码,根据上面配置来看,user_client中client为用户名,后面的123456为密码。

        新建kafka_client_jaas.conf文件,该文件为客户端使用:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="client"
    password="123456";
};

        username:为KafkaServer中的定义的用户名

        password:为KafkaServer定义用户名的密码

修改kafka的配置文件server.properties

        修改配置文件kafka/config/server.properties

#在文件中增加如下配置
# 配置listener名字和协议的映射(所以它是一个key-value的map),key是“listener名字”,value是“协议名称”
listener.security.protocol.map=LOCAL:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
# 监听端口,根据上面定义的listener名称定义对应的端口
listeners=LOCAL://172.16.180.42:9091,INTERNAL://kafka1:19091,EXTERNAL://kafka1:29091
# 定义对外的端口信息。
advertised.listeners=INTERNAL://kafka1:19091,EXTERNAL://kafka1:29091
# 定义sasl的mechanisms
sasl.enabled.mechanisms=PLAIN
#定义内部broker使用的通讯协议
inter.broker.listener.name=INTERNAL
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

#当没有找到ACL配置时,允许所有的访问操作。
allow.everyone.if.no.acl.found=true

        其中最重要的就是 listeners 和 advertised.listeners :集群启动时监听listeners配置的地址,并将 advertised.listeners 配置的地址写到Zookeeper里面,作为集群元数据的一部分。我们可以将客户端(生产者/消费者)连接Kafka集群进行操作的过程分成2步:

  1. 通过 listeners 配置的连接信息(ip/host)连接到某个Broker(broker会定期获取并缓存zk中的元数据信息),获取元数据中 advertised.listeners 配置的地址信息。
  2. 通过第1步获取的 advertised.listeners 连接信息和Kafka集群通信(读/写)。

        根据上面配置我们要把EXTERNAL对应的端口代理出去,并且使用账户密码的方式进行验证连接。所以我们对EXTERNAL配置的时候ip尽量使用hostName的方式。因为你配置的ip不一定是客户端直接使用的ip,就是说你这里配置本机的内网ip为192.16.1.102。但是客户端使用的时候可能使用公网的ip:xxx.xx.xxx.xx。这样会造成连接失败的问题。所以我们使用hostName配置的方式。在客户端使用的时候也用hostName的方式配置。

        根据上面配置得到:内部端口使用19091免密连接。外部端口使用19092使用秘钥连接。

修改kafka的启动脚本kafka-server-start.sh

首先把kafka_server_jaas.conf放到kafka/config下。

编辑kafka/bin/kafka-server-start.sh

#在顶部增加
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/config/kafka_server_jaas.conf"

(三)连接测试

        测试使用kafka自带脚本来测试。

        在kafka/bin下有两个脚本:kafka-console-consumer.sh和kafka-console-producer.sh分别对应消费端和生产端。

        首先把kafka_client_jaas.conf放到kafka/conf下,配置上面两个脚本增加如下:

#在上面两个脚本上部增加如下声明
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka/conf/kafka_client_jaas.conf"

生产者启动如下:

#使用免密的方式连接19091
./kafka-console-producer.sh --bootstrap-server kafka1:19091 --topic test 
#使用加密的方式连接29091
./kafka-console-producer.sh --bootstrap-server kafka1:29091 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

消费者启动如下:

#使用免密的方式连接19091
./kafka-console-consumer.sh --bootstrap-server kafka1:19091 --topic test 
#使用加密的方式连接29091
./kafka-console-consumer.sh --bootstrap-server kafka1:29091 --topic test --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

参考文章:

Kafka的监听地址配置_Java小海.的博客-CSDN博客_kafka监听

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐