Flink—kafkaSink
Flink—kafkaSink1.开发流程特别说明:Flink内置了一些Sink, 除此之外的Sink需要用户自定义!2.环境介绍hadoop+zookeeper+kafka添加Kafka Connector依赖<dependency><groupId>org.apache.flink</groupId><artifactId>flink-conne
·
Flink—kafkaSink
1.开发流程
特别说明:
Flink内置了一些Sink, 除此之外的Sink需要用户自定义!
2.环境介绍
hadoop+zookeeper+kafka
添加Kafka Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
3.代码
import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class Flink_Sink_Kafka {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取端口数据并转化为JavaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop102", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0],
Long.parseLong(split[1]),
Integer.parseInt(split[2]));
}
});
//3.将数据转换为字符串写入Kafka
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
waterSensorDS.map(new MapFunction<WaterSensor, String>() {
@Override
public String map(WaterSensor value) throws Exception {
return JSON.toJSONString(value);
}
}).addSink(new FlinkKafkaProducer<String>("test",new SimpleStringSchema(),properties));
//4.执行任务
env.execute();
}
}
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
在集群上启动kafka的消费者,查看接收到的数据
kafka/bin 目录下输入命令:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
结果显示:
目录下输入命令:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
结果显示:
更多推荐
所有评论(0)