Zookeeper & Kafka & Debezium & Spark 实时数据同步方案在实际项目中的使用
这个标题取的,专业但不利于传播,哈哈。好了,进入正题 。我们在搞一个大数据平台的时候,数据从哪些来?一般也无非以下几个来源:1)自己的业务系统,可能是MySQL或其他各种DB;2)埋点;3)爬虫;4)其他数据源,如比你买了鲸准数据,海鹰数据等等;这么多的数据怎么进入我们的数仓(Hive/Hbase/ClickHouse......)这个时候我们就需要CDC了。可以理解为数据抽取?数据实时同步(增量
我们在搞一个大数据平台的时候,数据从哪些来?一般也无非以下几个来源:
1)自己的业务系统,可能是MySQL或其他各种DB;
2)埋点;
3)爬虫;
4)其他数据源,如比你买了鲸准数据,海鹰数据等等;
这么多的数据怎么进入我们的数仓(Hive/Hbase/ClickHouse......)这个时候我们就需要CDC了。可以理解为数据抽取?数据实时同步(增量、全量)?都可以。随你。
差点漏了,CDC也有其他的使用场景,比如:
-
业务数据发展到一定水平,需要将大部分冷热数据从熟悉的DB迁移到其他存储进行复杂查询和分析
-
分库分表后,某些报表类查询无法工作,需要汇总到单库表进行操作
-
分库分表有多个维度,需要拷贝多份数据达成冗余
-
通过伪数据共享(没办法引入MQ、无法共享库表)进行业务改造
-
慢存储→Cache之间的同步
-
不停服数据迁移/scheme变更
关于数据同步,只是一个引子,可以深入的学习系统的资料或技术。
1、CDC技术选型
debezium :https://debezium.io/documentation/reference/1.0/connectors/mysql.html
kafka: http://kafka.apache.org/
zookeeper : http://zookeeper.apache.org/
CDC的组件有很多,比如:maxwell、canal、debezium、flinkx等等,还是比较多的。组件之间的对比,有需要可以查看其他文章,大神们都写的很清楚。这里,我结合自己的业务,选择了debezium(Debezium)文档很全,但是英文的。debezium的工作原理在这里略过,可以看官网,但我们必须要知道两件事情:
- Debezium是基于kafka connector 集成开发的与kafka 高度耦合
- kafka 1.0 以后 connector的发展很快,confluent 有很多开源的connector
2、接下来我们就说一下从0到 1搭建一套CDC,并让他跑起来。
2.1、准备好机器,建议三台,如下:
名称 | IP | 用途 |
hadoopMaster | 192.168.1.1 | Zookeeper、Kafka、Kafka-Eagle |
hadoopSlave0 | 192.168.1.2 | Zookeeper、Kafka |
hadoopSlave1 | 192.168.1.3 | Zookeeper、Kafka |
三台机器上都要安装JDK,略过。
设置三台机器的名称,并把IP与名称映射起来,每台机器都要配置,如下:
vim /etc/hosts
192.168.1.1 hadoopMaster
192.168.1.2 hadoopSlave0
192.168.1.3 hadoopSlave1
#重启一下虚拟机
reboot
让hadoopMaster节点可以SSH无密码登录到各个hadoopSlave节点上
# 当前在hadoopMaster机器上哦
1)ssh-keygen -t rsa #生成ssh密钥,不提示输入密码
2)ssh-copy-id hadoopMaster
3)ssh-copy-id hadoopSlave0
4)ssh-copy-id hadoopSlave1 #将密钥拷贝到各节点
5)ssh hadoopSlave1 #测试免密登录
2.2、准备完这些后,就可以从zookeeper集群开始了
# 安装目录:/usr/local/zookeeper
# 下载zk https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/ 注意:需要下载:apache-zookeeper-3.6.3-bin.tar.gz
# 解压
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz
# /usr/local/zookeeper下创建zkdata目录
# 进入zkdata
vi zkData/myid
## hadoopMaster 1; hadoopSlave0 2; hadoopSlave1 3; 对应的机器只填对应的一个数字,啥也没有,包括空格
# 进入 /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/conf
# 复制配置文件模板
cp zoo_sample.cfg zoo.cfg
# 修改此参数
dataDir=/opt/app/zookeeper/zkData
# 新增下面的内容
server.1=hadoopMaster:3181:4181
server.2=hadoopSlave0:3181:4181
server.3=hadoopSlave1:3181:4181
# 3181表示follow与leader之间的通信端口
# 4181表示选举端口
# 默认2181是客户端访问端口,不用修改
# clientPort=2181
# 从Master上分发配置到其他节点
#将zookeeper目录分发到其他节点的相同目录下:
scp -r /usr/local/zookeeper/apache-zookeeper-3.6.3-bin root@hadoopSlave0:/usr/local/zookeeper/
# 启动集群(每个节点都需要进行启动操作)
# 进入 /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/bin
./zkServer.sh start
# 查看集群状态
./zkServer.sh status
2.3、至此,ZK可以正常运行了。然后我们开始搭建Kafka集群。
# 依赖JDK和Zookeeper环境已提前安装
# 下载地址:http://kafka.apache.org/downloads
# 我们安装的版本:kafka_2.12-2.6.0.tgz ,安装目录:/usr/local/kafka
# 解压下载的安装包
tar -zxvf kafka_2.12-2.6.0.tgz
# 修改配置文件
# 进入 /usr/local/kafka/kafka_2.12-2.6.0
vim config/server.properties
# 修改下面的内容
# kafka节点id hadoopSlave0 = 2 hadoopSlave1 = 3
broker.id=1
# 删除topic时,是否物理删除,否则只是逻辑删除
delete.topic.enable=true
# 实际存放topic数据的目录,默认kafka的日志存放在 kafka/logs 下,这里数据和日志最好分开存放
log.dirs=/usr/local/kafka/kafka_2.12-2.6.0/data
# 指定我们准备好的zk集群
zookeeper.connect=hadoopMaster:2181,hadoopSlave0:2181,hadoopSlave1:2181
# 分发kafka目录到其他节点
scp -r /usr/local/kafka/kafka_2.12-2.6.0 root@hadoopSlave0:/usr/local/kafka
# 启动kafka服务
cd /usr/local/kafka/kafka_2.12-2.6.0/bin
# 3个节点都需要一一启动
./kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.6.0/config/server.properties
2.4、给kafka配一个web版的管理页面或仪表盘,管理起来更加方便
-
官网:EFAK
-
下载: Download - EFAK
-
下载:kafka-eagle-bin-2.0.7.tar.gz 解压后会有:fak-web-2.0.7-bin.tar.gz
-
安装目录:/usr/local/kafka/kafka-eagle 解压完的路径是:/usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7
-
设置Kafka-Eagle目录
-
vim /etc/profile export KE_HOME= /usr/local/kafka/kafka-eagle PATH=$PATH:$KE_HOME/bin
-
设置java目录
vim /etc/profile export JAVA_HOME= /usr/local/java/
-
更新环境变量
-
source /etc/profile
-
修改Kafka-Eagle配置文件
-
cd /usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7/conf vim system-config.properties # multi zookeeper&kafka cluster list # zookeeper和kafka集群配置 # kafka.eagle 支持管理多个kafka集群 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=hadoopMaster:2181,hadoopSlave0:2181,hadoopSlave1:2181 # kafka eagle webui port # web页面访问端口号 kafka.eagle.webui.port=8048 # kafka jdbc driver address # kafka 默认使用sqlite数据库 设置好路径 kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/usr/local/kafka/kafka-eagle/db/ke.db kafka.eagle.username=root kafka.eagle.password=www.kafka-eagle.org
- 启动kafka-eagle
-
cd /usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7/bin chmod +x ke.sh ./ke.sh start
- 启动kafka-eagle时的部分信息,注意,这里已经给出了默认的账号及密码信息。
-
[2021-09-16 23:14:02] INFO: Port Progress: [##################################################] | 100% [2021-09-16 23:14:06] INFO: Config Progress: [##################################################] | 100% [2021-09-16 23:14:09] INFO: Startup Progress: [##################################################] | 100% [2021-09-16 23:13:59] INFO: Status Code[0] [2021-09-16 23:13:59] INFO: [Job done!] Welcome to ______ ______ ___ __ __ / ____/ / ____/ / | / //_/ / __/ / /_ / /| | / ,< / /___ / __/ / ___ | / /| | /_____/ /_/ /_/ |_|/_/ |_| ( Eagle For Apache Kafka® ) Version 2.0.7 -- Copyright 2016-2021 ******************************************************************* * EFAK Service has started success. * Welcome, Now you can visit 'http://192.168.11.163:8048' * Account:admin ,Password:123456 ******************************************************************* * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> *******************************************************************
- kafka-eagle UI界面
- 说明:三台机器kafka-eagle只配置一台即可。
2.5、接下来就可以配置Kafka Connect了。
首选我们从 debezium 官网 https://debezium.io/releases/1.8/ 下载:debezium-connector-mysql-1.6.2.Final-plugin.tar.gz
记得下载完后放到/usr/local/kafka/kafka_2.12-2.6.0/plugins目录。
KafkaCnnect有两个核心概念还是要了解一下:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。依然有点懵?其他文章可以深入的学习一下。个人认为在后续的开发中还是很用的。
Kafka Connect的启动方式分为单机模式和集群模式,这里我们只关心集群模式。
在distributed模式下,启动不需要传递connector的参数,而是通过REST API来对kafka connect进行管理,包括启动、暂停、重启、恢复和查看状态的操作。
必须知道的几个个topic
1-group.id (topic name默认 connect-cluster):Connect cluster group 使用唯一的名称。
2-config.storage.topic (topic name默认 connect-configs):topic 用于存储 Connector 和任务配置。
3-offset.storage.topic (topic name默认 connect-offsets) :topic 用于存储 offsets。
4-status.storage.topic (默认 connect-status):topic 用于存储状态。
分布式配置文件还是要配一下:找到kafka安装目录:/usr/local/kafka/kafka_2.12-2.6.0/config
# vi connect-distributed.properties
bootstrap.servers=hadoopMaster:9092,hadoopSlave0:9092,hadoopSlave1:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000
#rest.host.name=
rest.port=8083
rest.advertised.host.name=hadoopMaster
rest.advertised.port=8083
# Debezium 配置
plugin.path=/usr/local/kafka/kafka_2.12-2.6.0/plugins
启动 Kafka Connect (每个节点都要启动:这里是以分布式方式启动的,有别于本地模式哈)
# cd /usr/local/kafka/kafka_2.12-2.6.0/bin
./connect-distributed.sh -daemon ../config/connect-distributed.properties
至此CDC的部署全部完成。那怎么用呢?
3、CDC使用
3.1、Kafka Connect & Debezium 使用入门
先加强一下印象:
1-kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中。
2-在任何一个机器上通过curl的方式请求确认服务是否正常:curl localhost:8083/connectors,如果办输出一个空的[],说明一切都OK了;
3-集群模式需要通过 Kafka Connect REST API 去提交connectors,具体的API不一一列出来了;
我们提交一个connectors
{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.1.1",
"database.port": "3306",
"database.user": "root",
"database.password": "......",
"database.server.id": "202201",
"database.server.name": "testdbserver",
"database.include.list": "cdc_test_db",
"database.history.kafka.bootstrap.servers": "hadoopMaster:9092,hadoopSlave0:9092,hadoopSlave1:9092",
"database.history.kafka.topic": "db_history_test",
"include.schema.changes": "true",
"table.include.list": "t_cdc_test",
"database.history.skip.unparseable.ddl": "true"
}
}
在三台机器的任意一台执行:curl -X POST -H "Content-Type: application/json" localhost:8083/connectors/ -d ‘上面的那小段JSON’ 或都使用Postman提交到“192.168.1.3:8083/connectors”是一样的效果。
提交成功后再执行一下:curl localhost:8083/connectors,就可以看到是这样了:[test-connector]
参数基本上都能看的懂,Debezium官网上有非常详细的说明,请果需要,请移步至Debezium官网查看更多的配置。
如果我们需要同步更多的库、表数据,提交多个connectors就行了,这里提醒一下:默认情况下Debezium是一个表自动生成一个topic,如果topic太多了,建议多张表对应一个topic,具体操作不一一介绍。
最后:深入学习的话至少要做两件事:Kafka Connect REST API、Debezium官网至少过一遍;最后,你可能还需要抽2分钟上想想Kafka Connect与Debezium的关系,如果有精力,看看Debezium的源码那就更牛了。
3.2、Kafka Connect & Debezium & SparkStreaming整合到一起
说到Kafka Connect & Debezium & SparkStreaming整合前,我们看看 Debezium 通过Kafka发送的数据长什么样,如下:
{
"schema": {
"name": "testdbserver.cdc_test_db.t_cdc_test.Envelope",
"optional": false,
"type": "struct",
"fields": [{
"field": "before",
"name": "testdbserver.cdc_test_db.t_cdc_test.Value",
"optional": true,
"type": "struct",
"fields": [{
"field": "Id",
"optional": false,
"type": "int64"
}, {
"field": "Name",
"optional": false,
"type": "string"
}]
}, {
"field": "after",
"name": "testdbserver.cdc_test_db.t_cdc_test.Value",
"optional": true,
"type": "struct",
"fields": [{
"field": "Id",
"optional": false,
"type": "int64"
}, {
"field": "Name",
"optional": false,
"type": "string"
}]
}, {
"field": "source",
"name": "io.debezium.connector.mysql.Source",
"optional": false,
"type": "struct",
"fields": [{
"field": "version",
"optional": false,
"type": "string"
}, {
"field": "connector",
"optional": false,
"type": "string"
}, {
"field": "name",
"optional": false,
"type": "string"
}, {
"field": "ts_ms",
"optional": false,
"type": "int64"
}, {
"default": "false",
"field": "snapshot",
"name": "io.debezium.data.Enum",
"optional": true,
"type": "string",
"version": 1,
"parameters": {
"allowed": "true,last,false"
}
}, {
"field": "db",
"optional": false,
"type": "string"
}, {
"field": "sequence",
"optional": true,
"type": "string"
}, {
"field": "table",
"optional": true,
"type": "string"
}, {
"field": "server_id",
"optional": false,
"type": "int64"
}, {
"field": "gtid",
"optional": true,
"type": "string"
}, {
"field": "file",
"optional": false,
"type": "string"
}, {
"field": "pos",
"optional": false,
"type": "int64"
}, {
"field": "row",
"optional": false,
"type": "int32"
}, {
"field": "thread",
"optional": true,
"type": "int64"
}, {
"field": "query",
"optional": true,
"type": "string"
}]
}, {
"field": "op",
"optional": false,
"type": "string"
}, {
"field": "ts_ms",
"optional": true,
"type": "int64"
}, {
"field": "transaction",
"optional": true,
"type": "struct",
"fields": [{
"field": "id",
"optional": false,
"type": "string"
}, {
"field": "total_order",
"optional": false,
"type": "int64"
}, {
"field": "data_collection_order",
"optional": false,
"type": "int64"
}]
}]
},
"payload": {
"op": "c", //c代表创建(或插入),u代表更新,d代表删除,r代表读(在非初始快照的情况下)
"after": {
"Id": 1,
"Name": "1"
},
"source": {
"server_id": 1,
"version": "1.6.2.Final",
"file": "mysql-bin.000083",
"connector": "mysql",
"pos": 1004002502,
"name": "testdbserver",
"row": 0,
"ts_ms": 1641377065000,
"snapshot": "false",
"db": "cdc_test_db",
"table": "t_cdc_test"
}
是不是非常的清析?哪个库、哪个表、哪个字段发生了什么样的变化、变化前是什么、变化后是什么等等一应具全。
接下,我们聊聊 SparkStreaming 实时消费Kafka......
更多推荐
所有评论(0)