(转)pyflink实时接收kafka数据至kafka的其他topic
rm -rf job.pycat>job.py<<EOFimport osfrom pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristicfrom pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSetti
·
rm -rf job.py
cat>job.py<<EOF
import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
provinces = ("beijing", "shanghai", "hangzhou", "shenzhen", "jiangxi", "chongqing", "xizang")
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING())
def province_id_to_name(id):
return provinces[id]
#请根据创建的Kafka集群,输入以下信息。
def log_processing():
kafka_servers = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
kafka_zookeeper_servers = "xx.xx.xx.xx:2181,xx.xx.xx.xx:2181,xx.xx.xx.xx:2181"
source_topic = "payment_msg"
sink_topic = "results"
kafka_consumer_group_id = "test_3"
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
source_ddl = f"""
CREATE TABLE payment_msg(
createTime VARCHAR,
rt as TO_TIMESTAMP(createTime),
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
provinceId INT,
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = '{source_topic}',
'connector.properties.bootstrap.servers' = '{kafka_servers}',
'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}',
'connector.properties.group.id' = '{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
es_sink_ddl = f"""
CREATE TABLE es_sink (
province VARCHAR,
pay_amount DOUBLE,
rowtime TIMESTAMP(3)
) with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = '{sink_topic}',
'connector.properties.bootstrap.servers' = '{kafka_servers}',
'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}',
'connector.properties.group.id' = '{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)
t_env.sql_update(es_sink_ddl)
t_env.register_function('province_id_to_name', province_id_to_name)
query = """
select province_id_to_name(provinceId) as province, sum(payAmount) as pay_amount, tumble_start(rt, interval '5' second) as rowtime
from payment_msg
group by tumble(rt, interval '5' second), provinceId
"""
t_env.sql_query(query).insert_into("es_sink")
t_env.execute("payment_demo")
if __name__ == '__main__':
log_processing()
EOF
rm -rf lib
mkdir lib
cd lib
wget https://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.1/flink-sql-connector-kafka_2.11-1.10.1.jar
wget https://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-json/1.10.1/flink-json-1.10.1-sql-jar.jar
cd ../
zip -r lib.jar lib/*
参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830
更多推荐
已为社区贡献7条内容
所有评论(0)