Flink—kafkaSink

1.开发流程

在这里插入图片描述

特别说明:

Flink内置了一些Sink, 除此之外的Sink需要用户自定义!

在这里插入图片描述

2.环境介绍

hadoop+zookeeper+kafka

添加Kafka Connector依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

3.代码

import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class Flink_Sink_Kafka {
    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.读取端口数据并转化为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop102", 9999)
                .map(new MapFunction<String, WaterSensor>() {

                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new WaterSensor(split[0],
                                Long.parseLong(split[1]),
                                Integer.parseInt(split[2]));
                    }
                });


        //3.将数据转换为字符串写入Kafka
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        waterSensorDS.map(new MapFunction<WaterSensor, String>() {

            @Override
            public String map(WaterSensor value) throws Exception {
                return JSON.toJSONString(value);
            }
        }).addSink(new FlinkKafkaProducer<String>("test",new SimpleStringSchema(),properties));

        //4.执行任务
        env.execute();
    }
}
public class WaterSensor {

    private String id;
    private Long ts;
    private Integer vc;

}

在集群上启动kafka的消费者,查看接收到的数据

kafka/bin 目录下输入命令:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test

结果显示:

目录下输入命令:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test

结果显示:

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐