1.11.0 flinksql自定义kafka源(支持多个topic)
目前flink 1.11.0还不支持多个topic的kafka连接器 , 要实现这个功能需要自定义源,这里是基于已有的kafka connector<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId&g
·
目前flink 1.11.0还不支持多个topic的kafka连接器 , 要实现这个功能需要自定义源,这里是基于已有的kafka connector
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.11.0</version> </dependency>
flink1.11版本相比之前版本 , 重构了自定义connector方法 , 引入DynamicTable,详情见官网
1 创建connectorFactory,继承KafkaDynamicTableFactoryBase
public class TopicsKafkaTableConnecterFactory extends KafkaDynamicTableFactoryBase {
public static final String IDENTIFIER = "topicsKafka";
public TopicsKafkaTableConnecterFactory() {}
@Override
protected KafkaDynamicSourceBase createKafkaTableSource(DataType producedDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
return new TopicsKafkaTableSource(producedDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
}
@Override
protected KafkaDynamicSinkBase createKafkaTableSink(DataType consumedDataType, String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
return new KafkaDynamicSink(consumedDataType, topic, properties, partitioner, encodingFormat);
}
@Override
public String factoryIdentifier() {
return "topicsKafka";
}
}
2 创建TopicsKafkaTableSource,继承KafkaDynamicSourceBase
public class TopicsKafkaTableSource extends KafkaDynamicSourceBase {
protected TopicsKafkaTableSource(DataType outputDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
super(outputDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
}
@Override
public DynamicTableSource copy() {
return new KafkaDynamicSource(this.outputDataType, this.topic, this.properties, this.decodingFormat, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis);
}
@Override
public String asSummaryString() {
return "Kafka";
}
@Override
protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(String s, Properties properties, DeserializationSchema<RowData> deserializationSchema) {
//逗号切割topic名字
List<String> topics = Arrays.asList(s.split(","));
return new FlinkKafkaConsumer(topics, deserializationSchema, properties);
}
}
3 配置
在项目资源文件夹下创建文件
META-INF/services/org.apache.flink.table.factories.Factory
文件内容:
指定上面创建的TopicsKafkaTableConnecterFactory 的全路径
4flinksql使用
CREATE TABLE mySource (
a bigint,
b bigint
) WITH (
'connector' = 'topicsKafka',
'topic' = 'mytesttopic',
'properties.bootstrap.servers' = '172.17.0.2:9092',
'properties.group.id' = 'flink-test-cxy',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE mysqlsink (
id bigint,
b varchar
)
with (
'connector' = 'print'
/*'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://...' ,
'connector.username' = 'root' ,
'connector.password' = 'root',
'connector.table' = 'mysqlsink' ,
'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,
'connector.write.flush.interval' = '5s',
'connector.write.flush.max-rows' = '1'*/
);
insert into mysqlsink select a , cast(b as varchar) b from mySource;
更多推荐
已为社区贡献1条内容
所有评论(0)