flink的实时数据同步
构建基于flink、kafka、MySQL、hbase的实时数仓,实现:(1)业务数据全量同步到数据仓库;(2)业务数据实时增量同步到数据仓库,使用Kafka+canal实现增量数据采集。数仓架构设计:(1)全量拉取模块,采用flink-jdbc,或者sqoop(2)增量实时同步模块,使用Kafka+canal实现增量数据采集。canal是通过模拟成为mysql 的slave节点的方式,监听mys
构建基于flink、kafka、MySQL、hbase的实时数仓,实现:
(1)业务数据全量同步到数据仓库;
(2)业务数据实时增量同步到数据仓库,使用Kafka+canal实现增量数据采集。
数仓架构设计:
数据同步方案设计:
(1)全量拉取模块,采用flink-jdbc,或者sqoop
(2)增量实时同步模块,使用Kafka+canal实现增量数据采集。canal是通过模拟成为mysql 的slave节点的方式,监听mysql 的binlog日志来获取数据,binlog日志就是数据增删改的记录,canal解析binlog为byte流,推送给kafka,kafka使用时要进行反序列化为flatmessage对象;同一个库,同一个表的FlatMessage通过keyby会分到同一个partition。
数据采集方案:
首先在MySQL中建表,触发数据库变更
create table test (
uid int (4) primary key not null auto_increment,
name varchar(10) not null);
insert into test (name) values('10');
一条binlog日志转换为flatmessage后的格式如下:
{"data":[{"uid":"1","name":"10"}],"database":"flink","es":1598752886000,"id":1,"isDdl":false,"mysqlType":{"uid":"int(4)","name":"varchar(10)"},"old":null,"pkNames":["uid"],"sql":"","sqlType":{"uid":4,"name":12},"table":"test","ts":1598754586044,"type":"INSERT"}
binlog消息顺序性问题:
解决方案:
将需要保证顺序的消息发送到同一个partition
1 kafka的同一个partition内的消息是有序的
-
kafka 的同一个 partition 用一个write ahead log组织, 是一个有序的队列,所以可以保证FIFO的顺序;
-
因此生产者按照一定的顺序发送消息,broker 就会按照这个顺序把消息写入 partition,消费者也会按照相同的顺序去读取消息;
-
kafka 的每一个 partition 不会同时被两个消费者实例消费,由此可以保证消息消费的顺序性。
2 控制同一key分发到同一partition
要保证同一个订单的多次修改到达 kafka 里的顺序不能乱,可以在Producer 往 kafka 插入数据时,控制同一个key (可以采用订单主键key-hash算法来实现)发送到同一 partition,这样就能保证同一笔订单都会落到同一个 partition 内。
3 canal 需要做的配置
canal 目前支持的mq有kafka/rocketmq
,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力。我们只需在配置 instance 的时候开启如下配置即可:
1> canal.properties
# leader节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成
canal.mq.acks = all
备注:
-
这样只要至少有一个同步副本存在,记录就不会丢失。
2> instance.properties
1 # 散列模式的分区数
2 canal.mq.partitionsNum=2
3 # 散列规则定义 库名.表名: 唯一主键,多个表之间用逗号分隔
4 canal.mq.partitionHash=test.lyf_canal_test:id
备注:
-
同一条数据的增删改操作 产生的 binlog 数据都会写到同一个分区内;
-
查看指定topic的指定分区的消息,可以使用如下命令:
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
更多推荐
所有评论(0)