系列文章目录

实践数据湖iceberg 第一课 入门
实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式
实践数据湖iceberg 第三课 在sqlclient中,以sql方式从kafka读数据到iceberg
实践数据湖iceberg 第四课 在sqlclient中,以sql方式从kafka读数据到iceberg(升级版本到flink1.12.7)
实践数据湖iceberg 第五课 hive catalog特点
实践数据湖iceberg 第六课 从kafka写入到iceberg失败问题 解决
实践数据湖iceberg 第七课 实时写入到iceberg
实践数据湖iceberg 第八课 hive与iceberg集成
实践数据湖iceberg 第九课 合并小文件
实践数据湖iceberg 第十课 快照删除
实践数据湖iceberg 第十一课 测试分区表完整流程(造数、建表、合并、删快照)
实践数据湖iceberg 第十二课 catalog是什么
实践数据湖iceberg 第十三课 metadata比数据文件大很多倍的问题
实践数据湖iceberg 第十四课 元数据合并(解决元数据随时间增加而元数据膨胀的问题)
实践数据湖iceberg 第十五课 spark安装与集成iceberg(jersey包冲突)
实践数据湖iceberg 第十六课 通过spark3打开iceberg的认知之门
实践数据湖iceberg 第十七课 hadoop2.7,spark3 on yarn运行iceberg配置
实践数据湖iceberg 第十八课 多种客户端与iceberg交互启动命令(常用命令)
实践数据湖iceberg 第十九课 flink count iceberg,无结果问题
实践数据湖iceberg 第二十课 flink + iceberg CDC场景(版本问题,测试失败)
实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败)
实践数据湖iceberg 第二十二课 flink1.13.5 + iceberg0.131 CDC(CRUD测试成功)
实践数据湖iceberg 第二十三课 flink-sql从checkpoint重启
实践数据湖iceberg 第二十四课 iceberg元数据详细解析
实践数据湖iceberg 第二十五课 后台运行flink sql 增删改的效果
实践数据湖iceberg 第二十六课 checkpoint设置方法
实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作
实践数据湖iceberg 第二十八课 把公有仓库上不存在的包部署到本地仓库
实践数据湖iceberg 第二十九课 如何优雅高效获取flink的jobId
实践数据湖iceberg 第三十课 mysql->iceberg,不同客户端有时区问题
实践数据湖iceberg 第三十一课 使用github的flink-streaming-platform-web工具,管理flink任务流,测试cdc重启场景
实践数据湖iceberg 更多的内容目录



前言

代码中展示FLINK SQL 执行增删改的效果


一、JAVA 后台代码

1.代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlCDC {

    public static void main(String[] args) throws Exception {
        //TODO 1.准备环境
        //1.1流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.2 表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createSql = "CREATE TABLE stock_basic_source(\n" +
                "  `i`  INT NOT NULL,\n" +
                "  `ts_code`     CHAR(10) NOT NULL,\n" +
                "  `symbol`   CHAR(10) NOT NULL,\n" +
                "  `name` char(10) NOT NULL,\n" +
                "  `area`   CHAR(20) NOT NULL,\n" +
                "  `industry`   CHAR(20) NOT NULL,\n" +
                "  `list_date`   CHAR(10) NOT NULL,\n" +
                "  `actural_controller`   CHAR(100),\n" +
                "    PRIMARY KEY(i) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = 'hadoop103',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'hive',\n" +
                "  'password' = '123456',\n" +
                "  'database-name' = 'xxzh_stock',\n" +
                "  'table-name' = 'stock_basic'\n" +
                ")" ;
        //TODO 2.创建动态表
        tableEnv.executeSql(createSql);

        tableEnv.executeSql("select * from stock_basic_source").print();

        //TODO 6.执行任务
        env.execute();
    }

}

2.pom

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <artifactId>flink-iceberg-learning</artifactId>
    <groupId>org.example</groupId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <name>flink-iceberg-learning</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <!-- project compiler -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- maven compiler-->
        <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
        <!-- sdk -->
        <java.version>1.8</java.version>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <!-- engine-->
        <hadoop.version>2.7.2</hadoop.version>
        <flink.version>1.13.5</flink.version>
        <flink.cdc.version>2.0.2</flink.cdc.version>
        <iceberg.version>0.12.1</iceberg.version>
        <hive.version>2.3.6</hive.version>
        <!-- <scope.type>provided</scope.type>-->
        <scope.type>compile</scope.type>
    </properties>
    <dependencies>
        <!-- scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!-- flink Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!-- <= 1.13 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!-- 1.14 -->
        <!-- <dependency>-->
        <!-- <groupId>org.apache.flink</groupId>-->
        <!-- <artifactId>flink-table-planner_${scala.binary.version}
        </artifactId>-->
        <!-- <version>${flink.version}</version>-->
        <!-- <scope>${scope.type}</scope>-->
        <!-- </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-orc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!-- iceberg Dependency -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime</artifactId>
            <version>${iceberg.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!-- hadoop Dependency-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!-- hive Dependency-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <scope>${scope.type}</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-llap-tez</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.antlr</groupId>
            <artifactId>antlr-runtime</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
        <groupId>org.datanucleus</groupId>
        <artifactId>datanucleus-api-jdo</artifactId>
        <version>4.2.4</version>
    </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala.maven.plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven.assembly.plugin.version}</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

二、运行效果

1.启动,初始化数据

+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |           i |                        ts_code |                         symbol |                           name |                           area |                       industry |                      list_date |             actural_controller |
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |           2 |                      000004.SZ |                         000004 |                       国华网安 |                           深圳 |                       软件服务 |                       19910114 |                         三体人 |
| +I |           1 |                      000002.SZ |                         000002 |                          万科A |                           深圳 |                       全国地产 |                       19910129 |                    星星之火!!! |
| +I |           4 |                      000006.SZ |                         000006 |                        深振业A |                           深圳 |                       区域地产 |                       19920427 |  深圳市人民政府国有资产监督... |
| +I |           3 |                      000005.SZ |                         000005 |                         ST星源 |                           深圳 |                       环境保护 |                       19901210 |                    郑列列,丁芃 |
| +I |           6 |                      000008.SZ |                         000008 |                       神州高铁 |                           北京 |                       运输设备 |                       19920507 |       国家开发投资集团有限公司 |
| +I |           5 |                      000007.SZ |                         000007 |                        *ST全新 |                           深圳 |                       酒店餐饮 |                       19920413 |                         (NULL) |
| +I |           7 |                      000009.SZ |                         000009 |                       中国宝安 |                           深圳 |                       电气设备 |                       19910625 |                         (NULL) |
| +I |           0 |                      000001.SZ |                         000001 |                       平安银行 |                           深圳 |                           银行 |                       19910403 |                         (NULL) |

2.新增数据

INSERT INTO `stock_basic` VALUES ('8', '000010.SZ', '000010', '美丽生态', '深圳', '建筑工程', '19951027', '沈玉兴');
INSERT INTO `stock_basic` VALUES ('9', '000011.SZ', '000011', '深物业A', '深圳', '区域地产', '19920330', '深圳市人民政府国有资产监督管理委员会');

控制台输出:

| +I |           8 |                      000010.SZ |                         000010 |                       美丽生态 |                           深圳 |                       建筑工程 |                       19951027 |                         沈玉兴 |
| +I |           9 |                      000011.SZ |                         000011 |                        深物业A |                           深圳 |                       区域地产 |                       19920330 |  深圳市人民政府国有资产监督... |

该处使用的url网络请求的数据。


3.修改数据

把id=0的remark修改为11111,运行效果如下:

-U 表示修改前, +U表示修改后。

| -U |           0 |                      000001.SZ |                         000001 |                       平安银行 |                           深圳 |                           银行 |                       19910403 |                         (NULL) |
| +U |           0 |                      000001.SZ |                         000001 |                       平安银行 |                           深圳 |                           银行 |                       19910403 |                          11111 |

4.删除数据

把id=9的进行删除,运行效果如下:

| -D |           9 |                      000011.SZ |                         000011 |                        深物业A |                           深圳 |                       区域地产 |                       19920330 |  深圳市人民政府国有资产监督... |

总结

flink sql 在java代码中体现增删改的运行效果

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐