同事之前一直使用kafka-python开发. 上了ACL以后发现kafka-python居然不支持SASL_PLAINTEXT

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
sasl_mechanism (str) – string picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. Default: None

看了一下confluent-kafka是支持的但需要重新编译librdkafka
否则会报错:

KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN SASL_SCRAM"}

安装confluent-kafka

pip install confluent-kafka
Collecting confluent-kafka
  Downloading https://files.pythonhosted.org/packages/2a/ba/dccb27376453f91ad8fa57f75a7ba5dc188023700c1789273dec976477b2/confluent_kafka-0.11.6-cp37-cp37m-manylinux1_x86_64.whl (3.9MB)
    100% |████████████████████████████████| 3.9MB 984kB/s 

安装librdkafka

克隆下来

[root@node004110 18:21:05 /tmp]
git clone https://github.com/edenhill/librdkafka.git
Cloning into 'librdkafka'...
remote: Enumerating objects: 213, done.
remote: Counting objects: 100% (213/213), done.
remote: Compressing objects: 100% (111/111), done.
remote: Total 18133 (delta 133), reused 149 (delta 98), pack-reused 17920
Receiving objects: 100% (18133/18133), 11.15 MiB | 946.00 KiB/s, done.
Resolving deltas: 100% (13758/13758), done.

检查依赖

rpm -qa| grep openssl
openssl-1.0.2k-16.el7.x86_64
openssl-libs-1.0.2k-16.el7.x86_64
openssl-devel-1.0.2k-16.el7.x86_64

The GNU toolchain
GNU make
pthreads
zlib-dev (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support) --这个对于centos openssl-devel
libsasl2-dev (optional, for SASL GSSAPI support)
libzstd-dev (optional, for ZStd compression support)
安装

./configure
make && make install

注意下make的时候这里是ok就行

checking for libssl (by pkg-config)... ok
checking for libssl (by compile)... ok (cached)

替换库文件

查找

find / -name "librdkafka*" 
/root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/librdkafka.so.1

替换

#cd /root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/

[root@node004110 18:24:03 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so

[root@node004110 18:24:04 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#mv librdkafka.so.1 librdkafka.so.1.bak

[root@node004110 18:24:11 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ln -s /usr/local/lib/librdkafka.so.1 librdkafka.so.1

[root@node004110 18:24:23 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
lrwxrwxrwx 1 root root 30 Mar 8 18:24 librdkafka.so.1 -> /usr/local/lib/librdkafka.so.1
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1.bak
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so

验证

#python
Python 3.7.2 (default, Mar 4 2019, 16:55:21) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from confluent_kafka import Producer     
>>> p = Producer({'bootstrap.servers': '192.168.4.114:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism':'SCRAM-SHA-256','sasl.username':'admin','sasl.password':'your-admin-pass'})        
>>> def delivery_report(err, msg):
... if err is not None:
... print('Message delivery failed: {}'.format(err))
... else:
... print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
... 
>>> for data in ['hello','word']:  
... p.produce('test_acl', data.encode('utf-8'), callback=delivery_report)
... 
>>> p.poll(10) 
Message delivered to test_acl [0]
Message delivered to test_acl [0]
1
>>> p.flush()
0
>>> quit()

成功收到消息

[root@node004114 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.114:9092 --topic test_acl --consumer.config config/client-sasl.properties --from-beginning
hello
word
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐