需要源码或者进Flink群的 WeChat :zoomake1024

上一篇文章介绍了使用Flink SQL消费Kafka实时入湖的方法,整个流程都是SQL实现的,但是当处理逻辑复杂时,SQL并不是那么灵活,通过编码的方式可以更好的驾驭整个框架。

本篇记录下如何使用Java DataStream API以编码的方式,通过Flink CDC同步MySQL数据到Hudi,在整个过程中遇到了很多问题,会一并记录解决方案。

主要是参考Hudi源码中的HoodieFlinkStreamer来实现的,首先说明POM文件中需要引入的Jar包:

<dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-avro</artifactId>
      <version>1.11.1</version>
</dependency>

问题1 ,缺少该包会报:Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.AvroWriteSupport

<dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink</artifactId>
    <version>0.11.0</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>

问题2 ,需要将slf4j-log4j12排除,否则会引起冲突。

需要配置文件*_schema.avsc,并上传到HDFS集群,用来定义Hudi中表的schema,内容如下:

{
  "type" : "record",
  "name" : "record",
  "fields" : [ {
    "name" : "uid",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "name",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "partition",
    "type" : [ "null", "string" ],
    "default" : null
  }]
}

Flink CDC读取MySQL:
定义Source:MySqlSource mySqlSource =…

处理数据,需要直接拿到RowData的DataStream,因为后面入湖的API使用的DataStream

问题3 ,setField时需要使用StringData.fromString否则报错:
java.lang.String cannot be cast to org.apache.flink.table.data.StringData
伪代码如下:

DataStream<RowData> result = env.fromSource(mySqlSource......
        .map(new MapFunction<String, RowData>() {
            @Override
            public RowData map(String value) throws Exception {
                Tuple3<String, String, JSONObject> type
                .....
                GenericRowData rowData = new GenericRowData(3);
                rowData.setField(0, StringData.fromString(type.f2.getString("uid"))); //字段1
                rowData.setField(1, StringData.fromString(type.f2.getString("name")));//字段2
                rowData.setField(2, StringData.fromString("20220707"));//字段3为partition
                return rowData;
            }
        });

入湖:
问题4 ,RowType.of中应该为new String[] {“0”, “1”, “2”}而非new String[] {“f0”, “f1”, “f2”},Tuple时才需要加f,
如果写错了会导致没有数据入湖,程序也不报错,导致很难排查问题。

final LogicalType[] fieldTypes =......
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(hudiConf, RowType.of(fieldTypes, new String[] {"0", "1", "2"}), parallelism, result);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(hudiConf, parallelism, hoodieRecordDataStream);
if (StreamerUtil.needsAsyncCompaction(hudiConf)) {
    Pipelines.compact(hudiConf, pipeline);
} else {
    Pipelines.clean(hudiConf, pipeline);
}

此时就完成了主体程序的编写,其中StreamExecutionEnvironment、FlinkStreamerConfig、JCommander参考HoodieFlinkStreamer即可。

提交集群,启动程序:

flink run -c com.cdc.......MySqlCDCToHudiJob -m yarn-cluster -yqu media -ynm MySqlCDCToHudi -yjm 2048 -ytm 3072 -p 4 -ys 2 *-1.0-SNAPSHOT.jar \
--kafka-bootstrap-servers empty \
--kafka-topic empty \
--kafka-group-id empty \
--checkpoint-interval 3000 \
--target-base-path hdfs://....../tablepath \
--record-key-field uid \
--table-type MERGE_ON_READ \
--target-table t1 \
--partition-path-field partition \
--source-ordering-field partition \
--source-avro-schema-path hdfs://....../*_schema.avsc

问题5 ,明明同步的MySQL的数据,为什么还要传Kafka参数(kafka-bootstrap-servers、kafka-topic、kafka-group-id empty)?
可以看到是随意传的值,如果不配置:

Caused by: com.beust.jcommander.ParameterException: The following options are required: [–kafka-bootstrap-servers], [–kafka-topic], [–kafka-group-id]
at com.beust.jcommander.JCommander.validateOptions(JCommander.java:381)
at com.beust.jcommander.JCommander.parse(JCommander.java:341)
at com.beust.jcommander.JCommander.parse(JCommander.java:319)
at com.beust.jcommander.JCommander.(JCommander.java:240)

此时需要看下源码,因为官方的HoodieFlinkStreamer主要是将Kafka数据写入Hudi的,通过下面三个截图就明白:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
所以我们重新定义一个FlinkStreamerConfig或者修改Hudi源码,去掉required = true,让其成为一个通用的HudiConfig即可。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐