Flink-CDC 动态监控 mysql 数据表
Flink-CDC监控mysql的好处在于,在项目中无需向canal和maxwell那样要先将数据先存入kafka,而是直接将数据拉取到实时流当中。API方式监控import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.S
·
Flink-CDC监控mysql的好处在于,在项目中无需向canal和maxwell那样要先将数据先存入kafka,而是直接将数据拉取到实时流当中。
Flink - API方式监控
import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import io.debezium.data.Envelope; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; /** * @author zyj * @Date 2022/1/7 11:49 * 自定义序列化器 */ public class FlinkCDC01_CustomSchema { public static void main(String[] args) throws Exception { // 创建flink环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(1); // 开启检查点,5秒插入一次 env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); // 建立检查点超时时间为1分钟 env.getCheckpointConfig().setCheckpointTimeout(60000L); // 检查点重启次数和重启间隔 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000L)); // 设置job取消后,检查点是否保留 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置状态后端 env.setStateBackend(new FsStateBackend("hdfs:///flinkCDC")); // 设置访问HDFS用户名 System.setProperty("HADOOP_USER_NAME", "zyj"); DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("hadoop110") // mysql主机名 .port(3306) // mysql 端口号 .databaseList("project_realtime") // 要监控的数据库名,可写多个 .tableList("project_realtime.t_user") // 要监控的数据表,数据库.数据表方式 .username("root") // mysql用户名 .password("root") // mysql登录密码 .startupOptions(StartupOptions.initial()) // 从最开始的binlog读取数据 .deserializer(new MySchema()) // CDC 输出的文件格式 .build(); env .addSource(sourceFunction) .print(); env.execute(); } } class MySchema implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord sourceRecord, Collector<String> out) throws Exception { Struct valueStruct = (Struct) sourceRecord.value(); Struct sourceStruct = valueStruct.getStruct("source"); // 获取数据库的名称 String database = sourceStruct.getString("db"); // 获取表名 String table = sourceStruct.getString("table"); // 获取数据库操作的类型 String type = Envelope.operationFor(sourceRecord).toString().toLowerCase(); if (type.equals("create")) { type = "insert"; } JSONObject jsonObj = new JSONObject(); jsonObj.put("database", database); jsonObj.put("table", table); jsonObj.put("type", type); // 获取影响的数据data Struct afterStruct = valueStruct.getStruct("after"); JSONObject dataJsonObj = new JSONObject(); if (afterStruct != null) { for (Field field : afterStruct.schema().fields()) { String fieldName = field.name(); Object fieldValue = afterStruct.get(field); dataJsonObj.put(fieldName, fieldValue); } } // 如果是delete操作, data为{}, 空, 防止后续调用产生空指针异常 jsonObj.put("data", dataJsonObj); // 向下游传递数据 out.collect(jsonObj.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } }
FLink - SQL方式监控
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author zyj * @Date 2022/1/8 0:32 * 通过Flink-CDC动态读取MySQL表中的数据 SQL方式 */ public class FlinkCDC02_SQL { public static void main(String[] args) throws Exception { //TODO 1.准备环境 //1.1 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1.2 表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1.3 设置并行度 env.setParallelism(1); //TODO 2.创建动态表 tableEnv.executeSql("CREATE TABLE user_info (" + " id INT," + " name STRING," + " age INT" + ") WITH (" + " 'connector' = 'mysql-cdc'," + " 'hostname' = 'hadoop110'," + " 'port' = '3306'," + " 'username' = 'root'," + " 'password' = 'root'," + " 'database-name' = 'project_realtime'," + " 'table-name' = 't_user'" + ")"); tableEnv.executeSql("select * from user_info").print(); env.execute(); } }
pom依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.atguigu.gmall</groupId> <artifactId>gmall0224-cdc</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.12.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
更多推荐
所有评论(0)