【flink】之如何消费kafka数据?
为了编写一个使用Apache来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。
·
为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。
1.环境准备
确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181
2.Maven项目设置
创建一个新的Maven项目,并在pom.xml
中添加以下依赖:
-
<dependencies>
-
<!-- Flink dependencies -->
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-java_2
.12</artifactId>
-
<version>
1.13
.2</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-kafka_2
.12</artifactId>
-
<version>
1.13
.2</version>
-
</dependency>
-
-
<!-- Kafka client dependency -->
-
<dependency>
-
<groupId>org.apache.kafka</groupId>
-
<artifactId>kafka-clients</artifactId>
-
<version>
2.8
.0</version>
-
</dependency>
-
-
<!-- Logging -->
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-log4j12</artifactId>
-
<version>
1.7
.30</version>
-
</dependency>
-
</dependencies>
注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。
3.编写Flink Kafka Consumer代码
-
import org.apache.flink.api.common.functions.MapFunction;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-
-
import java.util.Properties;
-
-
public
class
FlinkKafkaConsumerDemo {
-
-
public
static
void
main
(String[] args)
throws Exception {
-
// 设置执行环境
-
final
StreamExecutionEnvironment
env
= StreamExecutionEnvironment.getExecutionEnvironment();
-
-
// Kafka消费者属性
-
Properties
props
=
new
Properties();
-
props.put(
"bootstrap.servers",
"localhost:9092");
-
props.put(
"group.id",
"test-group");
-
props.put(
"enable.auto.commit",
"true");
-
props.put(
"auto.commit.interval.ms",
"1000");
-
props.put(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
-
props.put(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
-
-
// 创建Kafka消费者
-
FlinkKafkaConsumer<String> myConsumer =
new
FlinkKafkaConsumer<>(
-
"input-topic",
// Kafka topic
-
new
SimpleStringSchema(),
// 反序列化器
-
props);
-
-
// 添加数据源
-
DataStream<String> stream = env.addSource(myConsumer);
-
-
// 数据处理
-
stream.map(
new
MapFunction<String, String>() {
-
@Override
-
public String
map
(String value)
throws Exception {
-
return
"Received: " + value;
-
}
-
}).print();
-
-
// 执行流程序
-
env.execute(
"Flink Kafka Consumer Example");
-
}
-
-
// 简单的字符串反序列化器
-
public
static
final
class
SimpleStringSchema
implements
DeserializationSchema<String> {
-
-
@Override
-
public String
deserialize
(byte[] message)
throws IOException {
-
return
new
String(message,
"UTF-8");
-
}
-
-
@Override
-
public
boolean
isEndOfStream
(String nextElement) {
-
return
false;
-
}
-
-
@Override
-
public TypeInformation<String>
getProducedType
() {
-
return BasicTypeInfo.STRING_TYPE_INFO;
-
}
-
}
-
}
4.执行程序
- 确保Kafka正在运行,并且有一个名为
input-topic
的topic(如果没有,你需要先创建它)。 - 编译并运行你的Maven项目
更多推荐
已为社区贡献6条内容
所有评论(0)