使用maxwell同步增量数据

maxwell基本使用及配置参考:
离线数仓(3) maxwell的安装及其使用

使用maxwell将增量数据同步到kafka中,然后再利用flume将kafka中的数据同步到hdfs中

maxwell配置

# tl;dr config
log_level=info

producer=kafka
kafka.bootstrap.servers=server15:9092,server16:9092,server17:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=%{table}

#表过滤,只同步特定的13张表
filter=include:gmall2022.cart_info,include:gmall2022.comment_info,include:gmall2022.coupon_use,include:gmall2022.favor_info,include:gmall2022.order_detail,include:gmall2022.order_detail_activity,include:gmall2022.order_detail_coupon,include:gmall2022.order_info,include:gmall20222022.order_refund_info,include:gmall2022.order_status_log,include:gmall2022.payment_info,include:gmall2022.refund_payment,include:gmall2022.user_info

# mysql login info
host=server15
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
[root@server16 maxwell-1.29.2]# maxwell.sh restart

利用flume从kafka中读取数据,传入到hdfs中(KafkaSource、file channel、hdfs sink)

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = server15:9092,server16:9092,server17:9092
##监听kafka的多个topic
a1.sources.r1.kafka.topics=cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.kafka.consumer.group.id=flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yyds.flume.interceptor.db.TimestampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/yyds/apps/flume-1.9.0/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/yyds/apps/flume-1.9.0/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6


## sink1
a1.sinks.k1.type = hdfs
##修改输出路径
a1.sinks.k1.hdfs.path = /origin_data/gmall2022/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-


a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
package com.yyds.flume.interceptor.db;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * 解决零点漂移问题
 * 修改header中的时间戳值,改为日志数据中的时间
 * 提供给hdfs sink使用  控制输出的文件夹
 */
public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        JSONObject jsonObject = JSONObject.parseObject(log);

        long timestamp = jsonObject.getLong("ts") * 1000;

        // 修改header中的时间戳值,改为日志数据中的时间
        Map<String, String> headers = event.getHeaders();
        headers.put("timestamp",timestamp+"");

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class  Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

增量表首日全量同步(mysql_to_kafka_inc_init.sh)

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/yyds/apps/maxwell-1.29.2

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall2022 --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐