离线数仓(5)利用maxwell采集增量数据到hdfs
使用maxwell同步增量数据maxwell基本使用及配置参考:离线数仓(3) maxwell的安装及其使用使用maxwell将增量数据同步到kafka中,然后再利用flume将kafka中的数据同步到hdfs中maxwell配置# tl;dr configlog_level=infoproducer=kafkakafka.bootstrap.servers=server15:9092,serve
·
使用maxwell同步增量数据
maxwell基本使用及配置参考:
离线数仓(3) maxwell的安装及其使用
使用maxwell将增量数据同步到kafka中,然后再利用flume将kafka中的数据同步到hdfs中
maxwell配置
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=server15:9092,server16:9092,server17:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=%{table}
#表过滤,只同步特定的13张表
filter=include:gmall2022.cart_info,include:gmall2022.comment_info,include:gmall2022.coupon_use,include:gmall2022.favor_info,include:gmall2022.order_detail,include:gmall2022.order_detail_activity,include:gmall2022.order_detail_coupon,include:gmall2022.order_info,include:gmall20222022.order_refund_info,include:gmall2022.order_status_log,include:gmall2022.payment_info,include:gmall2022.refund_payment,include:gmall2022.user_info
# mysql login info
host=server15
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
[root@server16 maxwell-1.29.2]# maxwell.sh restart
利用flume从kafka中读取数据,传入到hdfs中(KafkaSource、file channel、hdfs sink)
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = server15:9092,server16:9092,server17:9092
##监听kafka的多个topic
a1.sources.r1.kafka.topics=cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.kafka.consumer.group.id=flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yyds.flume.interceptor.db.TimestampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/yyds/apps/flume-1.9.0/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/yyds/apps/flume-1.9.0/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
##修改输出路径
a1.sinks.k1.hdfs.path = /origin_data/gmall2022/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
package com.yyds.flume.interceptor.db;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
/**
* 解决零点漂移问题
* 修改header中的时间戳值,改为日志数据中的时间
* 提供给hdfs sink使用 控制输出的文件夹
*/
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
long timestamp = jsonObject.getLong("ts") * 1000;
// 修改header中的时间戳值,改为日志数据中的时间
Map<String, String> headers = event.getHeaders();
headers.put("timestamp",timestamp+"");
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
增量表首日全量同步(mysql_to_kafka_inc_init.sh)
#!/bin/bash
# 该脚本的作用是初始化所有的增量表,只需执行一次
MAXWELL_HOME=/opt/yyds/apps/maxwell-1.29.2
import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall2022 --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
"cart_info")
import_data cart_info
;;
"comment_info")
import_data comment_info
;;
"coupon_use")
import_data coupon_use
;;
"favor_info")
import_data favor_info
;;
"order_detail")
import_data order_detail
;;
"order_detail_activity")
import_data order_detail_activity
;;
"order_detail_coupon")
import_data order_detail_coupon
;;
"order_info")
import_data order_info
;;
"order_refund_info")
import_data order_refund_info
;;
"order_status_log")
import_data order_status_log
;;
"payment_info")
import_data payment_info
;;
"refund_payment")
import_data refund_payment
;;
"user_info")
import_data user_info
;;
"all")
import_data cart_info
import_data comment_info
import_data coupon_use
import_data favor_info
import_data order_detail
import_data order_detail_activity
import_data order_detail_coupon
import_data order_info
import_data order_refund_info
import_data order_status_log
import_data payment_info
import_data refund_payment
import_data user_info
;;
esac
更多推荐
已为社区贡献8条内容
所有评论(0)