【flink】之如何消费kafka数据?
为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。
·
为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。
1.环境准备
确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181
2.Maven项目设置
创建一个新的Maven项目,并在pom.xml
中添加以下依赖:
<dependencies>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<!-- Kafka client dependency -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。
3.编写Flink Kafka Consumer代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkKafkaConsumerDemo {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
"input-topic", // Kafka topic
new SimpleStringSchema(), // 反序列化器
props);
// 添加数据源
DataStream<String> stream = env.addSource(myConsumer);
// 数据处理
stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return "Received: " + value;
}
}).print();
// 执行流程序
env.execute("Flink Kafka Consumer Example");
}
// 简单的字符串反序列化器
public static final class SimpleStringSchema implements DeserializationSchema<String> {
@Override
public String deserialize(byte[] message) throws IOException {
return new String(message, "UTF-8");
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
}
4.执行程序
- 确保Kafka正在运行,并且有一个名为
input-topic
的topic(如果没有,你需要先创建它)。 - 编译并运行你的Maven项目
更多推荐
已为社区贡献2条内容
所有评论(0)