flink中使用kafka用于消费kafka数据
注意:服务器flink版本等要与代码一致,不然会发布失败,本地成功。本文内容,是打包jar用于消费卡夫卡数据在服务端增加一条业务线
·
注意:服务器flink版本等要与代码一致,不然会发布失败,本地成功
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imooc.flink.java</groupId>
<artifactId>flink-train-java</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--kafka-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-kafka_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.18</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>compile</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--/*********** flink json**************/-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<!--<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.5</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>-->
<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
主函数
package com.zx.iot.consumer;
import com.alibaba.fastjson.JSONObject;
import com.zx.iot.dto.Equipment;
import com.zx.iot.dto.Thing;
import com.zx.iot.producer.IotProducer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
//@Slf4j
/**
* 使用Java API来开发Flink的实时处理应用程序.
* <p>
* wc统计的数据我们源自于socket
*/
public class IotDataFlink {
public static void main(String[] args) throws Exception {
// step1 :获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000);
//配置kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.6.24.56:9092");//kafka10.6.24.56:2181
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("iot-data", new SimpleStringSchema(), properties));
DataStream<Thing> answerDataStream = stream.map((String strMsg) -> {
Thing thing = new Thing();
try {
JSONObject jsonObject = JSONObject.parseObject(strMsg);
thing.setTs(jsonObject.get("ts").toString());
String data = jsonObject.get("data").toString();
//thing.setData(data);
List<Equipment> list = new ArrayList<>();
if(data!=null){
String[] equipmentArray = data.split(",");
for (int i = 0;i<equipmentArray.length;i++
) {
String equipmentStr = equipmentArray[i];
String[] equipmentInfoArray = equipmentStr.split(":");
if(equipmentInfoArray.length==2){
Equipment equipment = new Equipment();
equipment.setEquipmentCode(equipmentInfoArray[0]);
equipment.setStatus(equipmentInfoArray[1]);
list.add(equipment);
}
}
}
thing.setList(list);
System.err.println(thing.toString());
IotProducer iotProducer = new IotProducer();
iotProducer.sendList("iot-data-calculate",thing);
} catch (Exception ex) {
System.err.println("catch");
} finally {
return thing;
}
}).filter(x->x!=null);
/*stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split(",");
System.err.println(tokens[0]);
for(String token : tokens) {
if(token.length() > 0) {
collector.collect(new Tuple2<String,Integer>(token,1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5000)).sum(1).print().setParallelism(10);*///
env.execute("wordcount");
}
}
数据接收类
Thing类
package com.zx.iot.dto;
import java.util.List;
public class Thing {
private String ts;
private String data;
private List<Equipment> list;
public String getTs() {
return ts;
}
public void setTs(String ts) {
this.ts = ts;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public List<Equipment> getList() {
return list;
}
public void setList(List<Equipment> list) {
this.list = list;
}
public Thing(String ts, String data, List<Equipment> list) {
this.ts = ts;
this.data = data;
this.list = list;
}
public Thing() {
}
@Override
public String toString() {
return "Thing{" +
"ts='" + ts + '\'' +
", data='" + data + '\'' +
", list=" + list +
'}';
}
}
Equipment类
package com.zx.iot.dto;
public class Equipment {
private String equipmentCode;
private String status;
public String getEquipmentCode() {
return equipmentCode;
}
public void setEquipmentCode(String equipmentCode) {
this.equipmentCode = equipmentCode;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public Equipment(String equipmentCode, String status) {
this.equipmentCode = equipmentCode;
this.status = status;
}
public Equipment() {
}
@Override
public String toString() {
return "Equipment{" +
"equipmentCode='" + equipmentCode + '\'' +
", status='" + status + '\'' +
'}';
}
}
监听后发送数据
package com.zx.iot.producer;
import com.alibaba.fastjson.JSON;
import com.zx.iot.dto.Equipment;
import com.zx.iot.dto.Thing;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class IotProducer {
/*public static void main(String[] args) {
IotProducer iotProducer = new IotProducer();
Thing thing = new Thing();
List list = new ArrayList();
Equipment equipment = new Equipment();
equipment.setStatus("1");
equipment.setEquipmentCode("fdghudj237utcdysihxj237yh");
list.add(equipment);
thing.setList(list);
thing.setTs("9872120988421");
thing.setData("fduoijwps");
try {
iotProducer.sendList("iot-data-calculate",thing);
} catch (Exception e) {
e.printStackTrace();
}
}*/
public static void sendList(String topic, Thing thing) throws Exception {
// 0 初始化flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 分区数:不能超过cpu数
env.setParallelism(3);
// 1 读取集合中数据
/*ArrayList<String> wordsList = new ArrayList<>();
wordsList.add("hello");
wordsList.add("world");
wordsList.add("intmall");*/
//JSONArray Object = JSONArray.toJSON(list);
//DataStreamSource<Thing> stream = env.fromElements(thing);
String json = JSON.toJSONString(thing);
DataStreamSource<String> stream =env.fromElements(json);
System.err.println("发送。。。。。。。。。。"+topic);
// 2 kafka 生产者配置信息
Properties properties = new Properties();
//properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.6.24.56:9092");
properties.setProperty("bootstrap.servers", "10.6.24.56:9092");//kafka10.6.24.56:9092
properties.setProperty("group.id", "test");
// 3 创建kafka生产者
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);
// 4 生产者和 flink 流关联
stream.addSink(kafkaProducer);
System.err.println("发送后。。。。。。。。。。");
// 5 执行
env.execute("sender");
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)