Confluent介绍

官网:https://docs.confluent.io/platform/current/platform.html

Confluent是用来管理和组织不同数据源的流媒体平台,可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。

Confluent目前提供了社区版(免费)和商业版(收费)两个版本,社区版提供了Connectors、REST Proxy、KSQL、Schema-Registry等基础服务。商业版为企业提供了控制面板、负载均衡,跨中心数据备份、安全防护等高级特性。

Confluent下载

官方文档:https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html

curl -O http://packages.confluent.io/archive/5.5/confluent-5.5.0-2.12.tar.gz
tar -zxvf confluent-5.5.0-2.12.tar.gz -C /usr/local/confluent

Zookeeper

Zookeeper是一个开放源码的分布式应用程序协调服务,主要功能包扩:维护配置信息、命名、提供分布式同步、组管理等集中式服务 。Kafka使用ZooKeeper对集群元数据进行持久化存储,如果ZooKeeper丢失了Kafka数据,集群的副本映射关系以及topic等配置信息都会丢失,最终导致Kafka集群不再正常工作,造成数据丢失的后果。

vim /usr/local/confluent/etc/kafka/zookeeper.properties
tickTime=2000
dataDir=/usr/local/confluent/zookeeper-data
clientPort=2181
initLimit=5
syncLimit=2

##多个zookeeper server,server的编号1、2等要与myid中的一致
#server.1=10.0.165.8:2888:3888
#server.2=10.0.165.9:2888:3888

新建myid

echo 1 > /usr/local/confluent/zookeeper-data/myid

Kafka

Kafka是一个分布式流处理平台,基于zookeeper协调并支持分区和多副本的分布式消息系统,是一种高吞吐量的分布式发布订阅消息系统,消息队列中间件,主要功能是负责消息传输,Confluent就是依赖Kafka来进行消息传输。Kafka最大的特性就是可以实时的处理大量数据以满足各种需求场景。

vim /usr/local/confluent/etc/kafka/server.properties
zookeeper.connect=localhost:2181
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=<CONFLUENT_HOME>/kafka-log
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
confluent.support.metrics.enable=true
#设置默认分片
default.replication.factor=3
confluent.license.topic.replication.factor=1
confluent.metadata.topic.replication.factor=1
default.replication.factor=1
log.cleaner.enable=false
delete.topic.enable=true

Control Center

control center可以很容易地管理kafka的连接,创建,编辑,和管理与其他系统的连接。我们可以从producer到consumer监控data streams,保证我们的每一条消息都被传递,还能测量出消息的传输耗时多久。使用confluent control center能让开发人员不写一句代码,也能构建基于kafka的数据生产管道。

vim /usr/local/confluent/etc/confluent-control-center/control-center-production.properties
bootstrap.servers=localhost:9092
confluent.controlcenter.data.dir=/usr/local/confluent/confluent-data
confluent.license=XyZ
zookeeper.connect=localhost:2181
vim /usr/local/confluent/etc/kafka/connect-distributed.properties
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

Kafka-Rest

Kafka-Rest是Kafka RESTful接口服务组件,可以通过Restful接口而不是本机Kafka协议或客户端的情况下,生成和使用消息,而且还可以查看集群状态以及执行管理操作。

vim /usr/local/confluent/etc/kafka-rest/kafka-rest.properties
zookeeper.connect=localhost:2181
bootstrap.servers=PLAINTEXT://localhost:9092
port=8082
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

Schema-Registry

Schema-Registry是为元数据管理提供的服务,同样提供了RESTful接口用来存储和获取schemas,它能够保存数据格式变化的所有版本,并可以做到向下兼容。Schema-Registry还为Kafka提供了Avro格式的序列化插件来传输消息。Confluent主要用Schema-Registry来对数据schema进行管理和序列化操作。

vim /usr/local/confluent/etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
vim /usr/local/confluent/etc/schema-registry/connect-avro-distributed.properties
bootstrap.servers=localhost:9092
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
rest.host.name=0.0.0.0
rest.port=8083

启动脚本

# start-all
CONFLUENT_HOME="/tools/confluent/confluent-5.5.0"
 
 
echo ">>>>>> Starting Confluent Zookeeper ... ..."
$CONFLUENT_HOME/bin/zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties >> $CONFLUENT_HOME/logs/zookeeper.log 2>&1 &
echo $! > $CONFLUENT_HOME/logs/zookeeper.pid
pid=`cat "$CONFLUENT_HOME/logs/zookeeper.pid"`
echo ">>>>>> Confluent Zookeeper Started at PID:$pid"
echo
 
 
echo ">>>>>> Starting Confluent Kafka ... ..."
sleep 10
$CONFLUENT_HOME/bin/kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties >>$CONFLUENT_HOME/logs/kafka.log 2>&1 &
echo $! > $CONFLUENT_HOME/logs/kafka.pid
pid=`cat "$CONFLUENT_HOME/logs/kafka.pid"`
echo ">>>>>> Confluent Kafka Started at PID:$pid"
echo
 
 
echo ">>>>>> Wait for Kafka load log, it will cost about 30s"
sleep 30
echo ">>>>>> Starting Confluent Schema Registry ... ..."
$CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties >>$CONFLUENT_HOME/logs/schema.log 2>&1 &
echo $! > $CONFLUENT_HOME/logs/schema.pid
pid=`cat "$CONFLUENT_HOME/logs/schema.pid"`
echo ">>>>>> Confluent Schema Registry Started at PID:$pid"
echo
 
 
echo ">>>>>> Starting Confluent Control Center ... ..."
sleep 10
#CONTROL_CENTER_OPTS="-Djava.security.auth.login.config=/tmp/confluent/propertyfile.jaas" \
$CONFLUENT_HOME/bin/control-center-start $CONFLUENT_HOME/etc/confluent-control-center/control-center-production.properties >>$CONFLUENT_HOME/logs/control.log 2>&1 &
echo $! > $CONFLUENT_HOME/logs/control.pid
pid=`cat "$CONFLUENT_HOME/logs/control.pid"`
echo ">>>>>> Confluent Control Center Started at PID:$pid"
echo
 
 
echo ">>>>>> Starting Confluent Kafka Connector ... ..."
sleep 5
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/schema-registry/connect-avro-distributed.properties >>$CONFLUENT_HOME/logs/connect.log 2>&1 &
echo $! > $CONFLUENT_HOME/logs/connect.pid
pid=`cat "$CONFLUENT_HOME/logs/connect.pid"`
echo ">>>>>> Confluent Kafka Connector Started at PID:$pid"
echo
 
 
echo ">>>>>> Starting Confluent Kafka Rest Proxy ... ..."
sleep 5
$CONFLUENT_HOME/bin/kafka-rest-start $CONFLUENT_HOME/etc/kafka-rest/kafka-rest.properties >>$CONFLUENT_HOME/logs/kafka-rest.log 2>&1 &
echo $! > $CONFLUENT_HOME/logs/kafka-rest.pid
pid=`cat "$CONFLUENT_HOME/logs/kafka-rest.pid"`
echo ">>>>>> Confluent Kafka Rest Proxy Connector Started at PID:$pid"
echo
 
echo ">>>>>> All Done !!!"

停止脚本

# stop-all
CONFLUENT_HOME="/tools/confluent/confluent-5.5.0"
pid=`cat "$CONFLUENT_HOME/logs/kafka-rest.pid"`
kill -9 $pid
pid=`cat "$CONFLUENT_HOME/logs/connect.pid"`
kill -9 $pid
pid=`cat "$CONFLUENT_HOME/logs/control.pid"`
kill -9 $pid
pid=`cat "$CONFLUENT_HOME/logs/schema.pid"`
kill -9 $pid
pid=`cat "$CONFLUENT_HOME/logs/kafka.pid"`
kill -9 $pid
pid=`cat "$CONFLUENT_HOME/logs/zookeeper.pid"`
kill -9 $pid

kafka监控工具

https://www.kafka-eagle.org/index.html

ZK服务正常运行过程中,会产生大量的事务日志。用户需要定期清理。
清理方法参考:https://blog.csdn.net/xiaolang85/article/details/21184293

借鉴博客:

https://blog.csdn.net/weixin_43786255/article/details/107028095

https://blog.csdn.net/NEU_LightBulb/article/details/103412987

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐