Flink 二 消费kafka数据
flink消费kafka:引入依赖:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId&
·
前言:
Flink的DataSoures模块中,定义了DataStream API的数据输入操作,Flink将数据源主要分为内置和第三方数据源,内置数据源包含文件、Socket网络端口以及集合数据类型,主要用于数据测试。而第三方数据源,Flink通过SourceFunction定义了很多的数据源连接器,通过它们进行数据的读写操作。本篇通过Flink的kafka connector实现消费效果。
flink消费kafka:
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
自定义消息序列化类:
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class MessageDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {
return false;
}
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
//返回ConsumerRecord
return new ConsumerRecord<>(
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
new String(consumerRecord.key()),
new String(consumerRecord.value())
);
}
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
}
}
消费程序:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
public class FlinkConsumer {
public static void main(String[] args) throws Exception{
// 构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定checkpoint的触发间隔
env.enableCheckpointing(5000);
// 默认的CheckpointingMode就是EXACTLY_ONCE,也可以指定为AT_LEAST_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.9.239:9092");
//设置消费者组
properties.setProperty("group.id", "flink_consumer");
//消费的三种方式,默认是latest
//earliest:各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
properties.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("flink_kafka", new MessageDeSerializationSchema(), properties);
DataStreamSource<ConsumerRecord<String, String>> stream = env
.addSource(kafkaConsumer).setParallelism(2);
//消费多个topic
// List<String> topics = new ArrayList<>();
// topics.add("topic1");
// topics.add("topic2");
// DataStream<String> stream = env
// .addSource(new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {
@Override
public void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {
collector.collect("offset:"+record.offset());
collector.collect("partition:"+record.partition());
collector.collect("topic:"+record.topic());
collector.collect("key:"+record.key());
collector.collect("value:"+record.value());
}
}).print();
env.execute("consumer start");
}
}
消息生产程序:
import com.alibaba.fastjson.JSON;
import com.tt.flinkdemo.domain.MessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Slf4j
@Component
public class MessageProducter {
private static Properties getProps(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.9.239:9092");
props.put("acks", "all"); // 发送所有ISR
props.put("retries", Integer.MAX_VALUE); // 重试次数
props.put("batch.size", 16384); // 批量发送大小
props.put("buffer.memory", 102400); // 缓存大小,根据本机内存大小配置
props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送
props.put("client.id", "producer-syn-1"); // 发送端id,便于统计
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static void main(String[] args) throws Exception {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());
for (int i=20;i<=30;i++){
MessageEntity message = new MessageEntity();
message.setMessage("第"+i+"条:message");
message.setTotalDate(DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
message.setMessageId(System.currentTimeMillis());
log.info(JSON.toJSONString(message));
ProducerRecord<String, String> record = new ProducerRecord<>("flink_kafka",
System.currentTimeMillis()+"", JSON.toJSONString(message));
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata;
try {
recordMetadata = metadataFuture.get();
log.info("发送成功!");
log.info("topic:"+recordMetadata.topic());
log.info("partition:"+recordMetadata.partition());
log.info("offset:"+recordMetadata.offset());
} catch (InterruptedException|ExecutionException e) {
System.out.println("发送失败!");
e.printStackTrace();
}
Thread.sleep(100);
}
}
}
测试效果:
更多推荐
已为社区贡献2条内容
所有评论(0)