数据采集项目之业务数据(三)
开发公司为Zendesk公司开源,用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志,并将变更数据以JSON格式发送到Kafka等流处理平台。
1. Maxwell框架
开发公司为Zendesk公司开源,用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志,并将变更数据以JSON格式发送到Kafka等流处理平台。
1.1 MySQL主从复制
主机每次变更数据都会生成对应的Binlog日志,从机可以通过IO流的方式将Binlog日志下载到本地,可以通过它创造和主机一样的环境或者作为热备。
1.2 安装Maxwell
- 解压改名
- 启动MySQL Binlog, vim /etc/my.cnf. 增加如下配置:
- binlog_format 日志类型的三种类型:
- 基于语句:主机执行了什么语句,在从机里同样执行一遍。如果使用了random语句,会导致主从不一致。但是量级比较低
- 基于行:主机被改动后,从机同步一份。不会有主从不一致的问题,但是量价比较大,需要将每行修改的数据都拿一份。
- 混合模式:一般基于语句,但是如果基于语句会导致前后结果产生差异,自动转成基于行。
- binlog_format 日志类型的三种类型:
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
- 重启MySQL服务
- 创建Maxwell所需所需的数据库和用户,用来存储断点续传所需的数据。
CREATE DATABASE maxwell;
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';//maxwell库的所有权限给maxwell
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';//其他库的查询、复制权限给maxwell
- 修改maxwell配置文件
cp 配置文件,将会复制某个文件并且可以改名。
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
# 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集
filter=exclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key
1.3 Maxwell的使用
- 启动zookeeper,kafka
- 启动maxwell,
bin/maxwell --config config.properties --daemon
- 启动kafka消费者进程,用于消费maxwell添加到kafka的变更数据
- 启动数据生成jar包,查看消费者进程是否有新数据。
- 编写Maxwell启停脚本
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell
status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
start_maxwell
;;
esac
1.4 Bootstrap全量同步
Maxwell获取的数据都是后期变更的数据,但没有获取到数据库在开启Binlog日志之前的原始数据。
全量同步命令:/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties
2. 数仓数据同步策略
2.1 用户行为数据
数据源:Kafka
目的地:HDFS
传输方式采用Flume, 其中source为Kafka source, channel为Memmory channel, sink为HDFS sink。
根据官网查找相应参数:
- Kafka Source
- type = Kafka Source全类名
- kafka.bootstrap.servers 连接地址
- kafka.topics = topic_log
- batchSize: 批次大小
- batchDurationMillis: 批次间隔2s
- File Channel
- type: file
- dataDirs: 存储路径
- checkpointDir: 偏移量存储地址
- keep-alive: 管道满了后,生产者间隔多少秒再放数据
- HDFS Sink
- hdfs.rollInterval : 文件滚动,解决小文件问题,每隔多久滚动一次
- rollSize: 文件大小
- hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路径
- hdfs.round = false, 不采用系统本地时间
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false # 是否获取本地时间
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2 零点漂移问题
在HDFS系统存放文件时是按照时间进行分区存放的,存放时查看的是header中的timestamp,但是由于数据传输过程中也需要一段时间,header中的时间并不是数据的实际产生时间,这个就是零点漂移问题。
解决办法:借助拦截器,修改header中的timestamp的值。编写拦截器代码,需要在IDEA中创建对应的项目并打包。
- 导入依赖,flume-ng-core和JSON解析依赖fastjson (1.2.62)
- 创建包gmall.interceptor
- 创建类TimeStampInterceptor, 继承Interceptor接口
- 实现intercept(Event event)和intercept(Event events)
- 使用fastjson来解析json文件,得到jsonObject对象,用来获取时间戳ts。将获取到的时间戳覆盖header中的timestamp, 如果数据格式错误会抛异常,使用try-catch来捕获它,并过滤掉该条数据。注意此处不能使用for循环来一边遍历,一边删除集合数据。
@Override
public Event intercept(Event event) {
//1、获取header和body的数据
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
try {
//2、将body的数据类型转成jsonObject类型(方便获取数据)
JSONObject jsonObject = JSONObject.parseObject(log);
//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);
return event;
} catch (Exception e) {
e.printStackTrace();
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()) {
Event event = iterator.next();
if (intercept(event) == null) {
iterator.remove();//必须使用迭代器删除
}
}
return list;
}
-
打包时注意要带上fastjson依赖,需要在maven中添加配置打包插件。依赖中有flume和fastjson,但在虚拟机上有flume,没有fastjson,所以需要排除flume。可以使用provided标签来排除让打包时排除依赖。
- compile:在单元测试、编译、运行三种方式都会使用compile表明的依赖;
- test:在单元测试才会使用test表明的依赖;
- provided:在编译才会使用test表明的依赖;
-
Flume配置文件中添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全类名建议在IDEA中复制,Builder也需要根据自己的代码函数名修改
- 重新生成数据,查看是否根据数据本身的时间戳存放到对应的HDFS分区文件中。
3. 业务数据同步
3.1 同步策略
- 全量同步:每天将所有数据同步一份,业务数据量小,优先考虑全量同步。
- 增量同步:每天只将新增和变化进行同步,业务数据量大,优先考虑增量同步。
3.2 数据同步工具
全量:DataX
、Sqoop
增量:Maxwell
、Canal
3.3 DataX
是一个数据同步工具,致力于实现包括关系型数据库HDFS、Hive、ODPS、HBase、MySQL等等数据源之间的互传。
- 架构= reader + framework + writer
- 运行流程
- job: 单个数据同步的作业,会启动一个进程。
- Task: 根据不同数据源的切分策略,一个Job会切分为多个Task,Task是DataX作业的最小单元,每个Task负责一部分,由一个线程执行。
- 调度策略:会根据系统资源设置并发度,并发度为线程同时执行的个数,任务会按照并发度一组一组执行。
3.4 DataX安装
- 下载解压DataX安装包
bin/datax.py job/job.json
测试安装包是否完整- MySQL Reader配置文件的书写
- HDFS Writer配置文件的书写
- 执行datax命令
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2022-06-08" /opt/module/datax/job/import/gmall.activity_info.json
- 执行完后可以使用hadoop fs cat 路径名 | zcat,来查看压缩文件是否正确
更多推荐
所有评论(0)