不同的flink的sql任务,的sql
不同的flink的sqlOracle-cdc,oracle2Mysqlmysqlmysql-cdc,mysql2Mysqlkafka2MysqlOracle-cdc,oracle2MysqlCREATE TABLE t_wx_source_1 (id String NOT NULL,name String,age String) WITH ('connector' = 'oracle-cdc','
·
Oracle-cdc,oracle2Mysql
CREATE TABLE t_wx_source_1 (
id String NOT NULL,
name String,
age String
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '192.168.1.223',
'port' = '1521',
'username' = 'TGYTH_SG',
'password' = 'XXXX',
'database-name' = 'dbc',
'schema-name' = 'TGYTH_SG',
'table-name' = 'T_WX_SOURCE_1'
);
CREATE TABLE t_wx_target (
id String,
name String,
age String,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.91:3306/test',
'table-name' = 't_wx_target_1',
'username' = 'root',
'password' = 'XXXXX',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
insert into t_wx_target select id,name,age from t_wx_source_1;
mysql
create table t_shh_source(
name String
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.91:3306/test',
'table-name' = 't_shh_source',
'username' = 'root',
'password' = 'XXXXX',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
create table t_shh_target(
name String
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.91:3306/test',
'table-name' = 't_shh_target',
'username' = 'root',
'password' = 'XXXX',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
insert into t_shh_target select name from t_shh_source;
mysql-cdc,mysql2Mysql
create table t_wx_source(
id INT,
name String,
age String
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.91',
'port' = '3306','database-name' = 'test',
'table-name' = 't_wx_source',
'username' = 'root',
'password' = 'XXXXXX'
);
create table t_wx_target(
id INT,
name String,
age String,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.91:3306/test',
'table-name' = 't_wx_target',
'username' = 'root', 'password' = 'XXXXX',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
insert into t_wx_target select id,name,age from t_wx_source;
kafka2Mysql
CREATE TABLE kafkaTable (
ID STRING,
NAME STRING
) WITH (
'connector'='kafka',
'topic'='WX.LOGMINER_TBS.TEST01',
'properties.bootstrap.servers' = '192.168.111.241:9092',
'properties.group.id' = 'test2',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
create table t_wx_target_oracle(
id STRING,
name String,
primary key(id) not ENFORCED
)with(
'connector'='jdbc',
'url' ='jdbc:mysql://192.168.1.91:3306/test',
'table-name' = 't_wx_target_oracle',
'username' ='root',
'password' = 'XXXXX',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
insert into t_wx_target_oracle select ID,NAME from kafkaTable;
更多推荐
已为社区贡献3条内容
所有评论(0)