Mac 系统Kafka + Zookeeper SASL 配置
背景最近听说Spark很火,于是开始学习Spark的相关知识,在学到Spark Streaming的时候无意中发现Kafka这个消费系统挺不错的,特别是关于它的日志压缩(log compaction) 和安全认证两个特性可以很好的用在公司的项目里。SASL的配置网上有很多,遇到的坑也不少,也要感谢提供正确指导的博主们,现整理一下自己的经验。大伙也可以根据官方文档来折腾h......
背景
最近听说Spark很火,于是开始学习Spark的相关知识,在学到Spark Streaming的时候无意中发现Kafka这个流处理平台挺不错的,特别是关于它的日志压缩(log compaction) 和安全认证两个特性可以很好的用在公司的项目里。
SASL的配置网上有很多,遇到的坑也不少,也要感谢提供正确指导的博主们,现整理一下自己的经验。大伙也可以根据官方文档来折腾Kafka 中文文档 - ApacheCN
环境
- 系统 MacOs High Sierra 10.13.6
- Kafka版本 kafka_2.12-2.0.0
- Zookeeper版本 zookeeper-3.4.13
步骤
- 配置Zookeeper
- 配置Kafka 服务端
- 权限操作
- 配置Kafka 的Consumer 和 Producer 客户端
- 编写Java Consumer客户端用例
配置Zookeeper
1. 复制示例配置文件为zoo.cfg
cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOMEconf/zoo.cfg
2. 为zoo.cfg 添加SASL支持
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
3. 新建$ZOOKEEPER_HOME/conf/zk_server_jaas.conf文件,为Zookeeper添加账号认证信息
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka#321"
user_kafka="kafka#321";
};
注:
- org.apache.kafka.common.security.plain.PlainLoginModule 是认证的类
- username 是Kafka服务做为客户端连接到zookeeper的用户名
- password 是Kafka服务做为客户端连接到zookeeper的密码
- user_kafka 中的user_是固定的,kafka是对应username里的值,这个必须设置,要不然会认证失败
4. 因为认证方式使用的是Kafka的认证类 org.apache.kafka.common.security.plain.PlainLoginModule ,所以要用到Kafka相关的jar。
新建一个$ZOOKEEPER_HOME/sasl_jars目录,从$KAFKA_HOME/libs将相关jar拷到sasl_jars目录。以下是相关的jar包,这些版本会因为你的Kafka版本不同而不同
kafka-clients-2.0.0.jar
lz4-java-1.4.1.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.1.jar
5. 修改$ZOOKEEPER_HOME/bin/zkEnv.sh 脚本,让Zookeeper启动的时候可以加载相关jar包和认证信息
for i in "$ZOOBINDIR"/../sasl_jars/*.jar;
do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
配置Kafka 服务端
1. 新建$KAFKA_HOME/kafka_server_jaas.conf 文件,为Kafka 添加客户端账号认证信息和自己认证到Zookeeper的客户端账号信息
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka#321"
user_kafka="kafka#321"
user_producer="pro#321"
user_consumer="con#321"
user_both="both#321";
};
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka#321";
};
注:
- KafkaServer 是Kafka服务端配置哪些账号可以认证连接进来
- Client 是Kafka服务端配置要什么哪些方式及账号进行登录
2. 复制示例配置文件为kafka.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/kafka.properties
3. 在kafka.properties中添加SASL认证支持
#开启SASL 客户端认证
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
zookeeper.set.acl=true
super.users=User:kafka
4. 添加启动环境变量,在$KAFKA_HOME/bin/kafka-run-class.sh 添加以下内容
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=$KAFKA_HOME/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS $KAFKA_SASL_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS $KAFKA_SASL_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
5. 指定配置文件启动服务
nohup bin/kafka-server-start.sh config/kafka.properties
权限操作
- 为用户consumer 添加消费者权限,并只允许192.168.1.30这个ip访问,只限于消费wuye1主题和yc-test这个组
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --allow-host 192.168.1.30 --consumer --topic wuye1 --group yc-test
- 为用户producer 添加生产者权限,只能对wuye1这个主题生产信息
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --producer --topic wuye1
- 查看wuye1这个主题配置的权限。查看全部权限可以把--topic wuye1 去掉
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic wuye1
- 删除consumer用户针对wuye1主题的读权限,所有的权限删除后这个用户就会在权限列表中消失
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:consumer --operation Read --topic wuye1
配置Kafka 的Consumer 客户端
1. 新建$KAFKA_HOME/kafka_consumer_jaas.conf填入以下内容
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="con#321";
};
2. 分别修改$KAFKA_HOME/config/consumer.properties 新增以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3. 修改$KAFKA_HOME/bin/kafka-console-consumer.sh 新增以下内容
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/kafka_consumer_jaas.conf"
fi
#下面这行是原始内容主要是为了体现上面的内容要加在哪里
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
4. 启动消费者客户端,一定要注意--consumer.config config/consumer.properties 这个参数,之前因为没加这个折腾了挺久的
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wuye1 --group my-group-1 --partition 0 --consumer.config config/consumer.properties
配置Kafka 的 Producer 客户端
1. 新建$KAFKA_HOME/kafka_producer_jaas.conf填入以下内容
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="pro#321";
};
2. 分别修改$KAFKA_HOME/config/producer.properties 新增以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3. 修改$KAFKA_HOME/bin/kafka-console-producer.sh 新增以下内容
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/kafka_producer_jaas.conf"
fi
#下面这行是原始内容主要是为了体现上面的内容要加在哪里
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
4. 启动生产者客户端
kafka-console-producer.sh --broker-list localhost:9092 --topic wuye1 --producer.config config/producer.properties
编写Java Consumer客户端用例
- 要导入的包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
- java代码
package com.steely.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.WakeupException;
/**
* SASL Kafka消费者客户端
* @ate 2018年10月9日
* @author chenyongchao
*/
public class SASLConsumer {
public static void main(String[] args) throws InterruptedException {
ConsumerThread consumerThread = new ConsumerThread(getConsumer("localhost:9092"), "wuye1");
new Thread(consumerThread).start();
Thread.sleep(1000 * 60 * 60); //60分钟后把消费者停掉
consumerThread.shutdown();
}
private static class ConsumerThread implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean closed = new AtomicBoolean(false);
public ConsumerThread(KafkaConsumer<String, String> consumer, String... topics) {
this.consumer = consumer;
consumer.subscribe(Arrays.asList(topics));
}
@Override
public void run() {
try {
int count = 0;
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
count +=records.count();
records.forEach(record -> System.out.println("收到数据 partion : " + record.partition() + " key : " + record.key() + " value : " + record.value()+" offset ;"+record.offset()));
System.out.println(count + " 获取了一次数据!!!" + new Date(System.currentTimeMillis()));
}
} catch (WakeupException e) {
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
/**
* 获取消费者实例
* @date 2018年10月9日
* @author chenyongchao
* @param servers
* @return
*/
private static KafkaConsumer<String, String> getConsumer(String servers) {
System.setProperty("java.security.auth.login.config","/opt/kafka_2.12-2.0.0/kafka_consumer_jaas.conf");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
return new KafkaConsumer<>(props);
}
}
更多推荐
所有评论(0)