最近公司因为用的云服务器,需要保证kafka的安全性。可喜的是kafka0.9开始,已经支持权限控制了。网上中文资料又少,特此基于kafka0.9,记录kafaka的权限控制 ( flume需要1.7及其以上才支持kafka的SSL认证)。

下面各位看官跟着小二一起开始kafak权限认证之旅吧!嘎嘎嘎!

介绍:
kafka权限控制整体可以分为三种类型:
1.基于SSL(CDH 5.8不支持)
2.基于Kerberos(此认证一般基于CDH,本文不与讨论)
3.基于acl的 (CDH5.8中的kafka2.X不支持 )

本文主要基于apace版本的,实现1和3,也是用的最多的展开讨论。

统一说明:
在本文中&符号表示注释

一,准备工作
组件分布
kafka centos11,centos12,centos13
zoopeeker centos11,centos12,centos13

二、在kafka集群任选一台机子 ( 先介绍基于SSL的 )

密码统一为123456

&Step 1 Generate SSL key and certificate for each Kafka broker
keytool -keystore server.keystore.jks -alias centos11 -validity 365 -genkey

%Step 2 Creating your own CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

&Step 3 Signing the certificate
keytool -keystore server.keystore.jks -alias centos11 -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias centos11 -import -file cert-signed

三、其他的kafka集群

&机器centos13 centos12
keytool -keystore kafka.client.keystore.jks -alias centos13 -validity 365 -genkey
keytool -keystore kafka.client.keystore.jks -alias centos13 -certreq -file cert-file
cp cert-file cert-file-centos13

&centos11
scp ./ca* ce* server* root@centos13:/opt/kafka_2.10/

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-centos13 -out cert-signed-centos13 -days 365 -CAcreateserial -passin pass:123456
keytool -keystore kafkacentos13.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafkacentos13.client.keystore.jks -alias centos13 -import -file cert-signed-centos13

rm -rf producer.properties
echo “bootstrap.servers=centos13:9093” >> producer.properties
echo “security.protocol=SSL” >> producer.properties
echo “ssl.truststore.location=/opt/kafka_2.10/kafkacentos12.client.keystore.jks”>> producer.properties
echo “ssl.truststore.password=123456”>> producer.properties
echo “ssl.keystore.location=/opt/kafka_2.10/server.keystore.jks”>> producer.properties
echo “ssl.keystore.password=123456”>> producer.properties
echo “ssl.key.password=123456”>> producer.properties

4.验证:

openssl s_client -debug -connect localhost:9093 -tls1

output:
—–BEGIN CERTIFICATE—–
{variable sized random bytes}
—–END CERTIFICATE—–
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

5.使用:

bin/kafka-console-consumer.sh –bootstrap-server kafka2:9093 –topic test –new-consumer –consumer.config config/producer.properties

bin/kafka-console-producer.sh –broker-list centos11:9093 –topic test –producer.config ./config/producer.properties

bin/kafka-console-consumer.sh –bootstrap-server centos11:9093 –topic test –new-consumer –consumer.config ./config/producer.properties

bin/kafka-console-consumer.sh –bootstrap-server centos13:9093 –topic test –new-consumer –consumer.config ./config/producer.properties –from-beginning

6.基于ACL

server.properties中加配置

allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder

7.ACL的简单使用:

bin/kafka-acls.sh –authorizer-properties zookeeper.connect=centos11:2181 –add –allow-principal User:Bob –allow-principal User:Alice –allow-host 198.51.100.0 –allow-host 198.51.100.1 –operation Read –operation Write –topic test

bin/kafka-acls.sh –authorizer-properties zookeeper.connect=centos11:2181 –remove –allow-principal User:Bob –allow-principal User:Alice –allow-host 198.51.100.0 –allow-host 198.51.100.1 –operation Read –operation Write –topic test

8.Java Demo
需要将server.keystore.jks、client.truststore.jks从任一台机器上拷贝下来即可。

SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported

ConsumerDemo

package xmhd.examples;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Arrays;
import java.util.Properties;
/**
* Created by shengjk1.
* blog address :http://blog.csdn.net/jsjsjs1789
*
* 生产者可以保证权限认证
* SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported
*/
public class ConsumerZbdba {
public static void main(String[] args) {
// new ConsumerZbdba(“test”).start();// 使用kafka集群中创建好的主题 test
Properties props = new Properties();
/* 定义kakfa 服务的地址,不需要将所有broker指定上 */
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “centos11:9093;centos13:9093;centos12:9093”);
/* 制定consumer group */
props.put(“group.id”, “test”);
props.put(“auto.offset.reset”,”earliest”);
/* 是否自动确认offset */
// props.put(“enable.auto.commit”, “true”);
// props.put(ProducerConfig.CLIENT_ID_CONFIG, “myApiKey”);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, “F:\server.keystore.jks”);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, “123456”);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, “F:\client.truststore.jks”);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, “123456”);
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, “JKS”);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SSL”);
//

///* 自动确认offset的时间间隔 */
// props.put(“auto.commit.interval.ms”, “1000”);
// props.put(“session.timeout.ms”, “30000”);
/* key的序列化类 */
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
/* value的序列化类 */
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
/* 定义consumer */
KafkaConsumer

####################### Socket Server Settings

&这一点可能需要特别的注意,PLAINTEXT注释掉之后,一些基本的kafka脚本都不在能用了
&listeners=PLAINTEXT://centos11:9092,SSL://centos11:9093
listeners=SSL://centos11:9093
advertised.listeners=SSL://centos11:9093
ssl.keystore.location=/opt/kafka_2.10/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/opt/kafka_2.10/server.truststore.jks
ssl.truststore.password=123456
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.inter.broker.protocol=SSL

&acl
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder

host.name=centos11
advertised.host.name=centos11

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

####################### Log Basics

log.dirs=/opt/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

####################### Zookeeper

zookeeper.connect=centos11:2181,centos12:2181,centos13:2181
zookeeper.connection.timeout.ms=6000

kafka producer.properties
centos11为机器名,根据需求自行修改

bootstrap.servers=centos11:9093
security.protocol=SSL
ssl.truststore.location=/opt/kafka_2.10/client.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/opt/kafka_2.10/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456

10、参考网址:
具体细节可参考官网!

http://kafka.apache.org/090/documentation.html#security_authz
http://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html
http://blog.csdn.net/zbdba/article/details/52458654

11.通信协议的支持情况
这里写图片描述

12.扩展阅读:

关于SSL原理
http://www.linuxde.net/2012/03/8301.html
http://orchome.com/171

转载 Blog Address:http://blog.csdn.net/jsjsjs1789

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐