Flink消费Kafka
Flink消费Kafka点关注不迷路,欢迎再访!一.Flink Source&Sink在 Flink 中,Source 代表从外部获取数据源,Transformation 代表了对数据进行转换操作,Sink 代表将内部数据写到外部数源。一个 Flink Job 一般由 Source, Transformation, Sink 组成。二.引入kafka依赖<...
·
Flink消费Kafka
点关注不迷路,欢迎再访!
一.Flink Source&Sink
在 Flink 中,Source 代表从外部获取数据源,Transformation 代表了对数据进行转换操作,Sink 代表将内部数据写到外部数源。
一个 Flink Job 一般由 Source, Transformation, Sink 组成。
二.引入kafka依赖
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- /kafka -->
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- /flink -->
三.连接Flink配置类
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
*
* <p>
* description:Flink属性相关配置
* </p>
*
* @author sunqi
* @since 2019年8月5日
* @see
*/
@Configuration
@EnableTransactionManagement
public class FlinkConfig {
Log logger = LogFactory.getLog(this.getClass());
@Value("${bootstrap.servers}")
String bootstrapServers;
@Value("${group.id}")
String groupId;
@Bean(name = "kafkaProps")
public Properties kafkaProps() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.partition.fetch.bytes", "8388608");// 设置分区消费消息最大size 8M
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
注意事项:
setStartFromGroupOffsets,如果 group offset 不存在,或者 group offset 无效的话,将依据 “auto.offset.reset” 该属性来决定初始 offset。auto.offset.reset 默认为 largest。
setStartFromSpecificOffsets,如果指定 offset 无效时,则将该 topic partition 的 offset 将设置为 group offset。
如果该作业是从 checkpoint 或 savepoint 中恢复,则所有设置初始 offset 的函数均将失效,初始 offset 将从 checkpoint 中恢复。
四.完整案例分析
/**
* @author sunqi
*
*/
@Component
public class KafkaFlinkJob implements ApplicationRunner {
@Autowired
private Properties kafkaProps;
@SuppressWarnings("all")
@Override
public void run(ApplicationArguments args) throws Exception {
//创建一个流运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置检查点时间间隔
env.enableCheckpointing(6000);
//fs状态后端配置,如为file:///,则在taskmanager的本地
env.setStateBackend( new FsStateBackend("file:///opt/tpapp/flinkdata", true ));
//确保一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//退出不删除checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//指定checkpoint执行的超时时间为60秒
env.getCheckpointConfig().setCheckpointTimeout(60000);
//指定运行中的checkpoint最多数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//运行环境的默认并发数为1
env.setParallelism(1);
//数据源
DataStream<String> sourceStream = readFromKafka(env,kafkaProps);
//解析kafka数据流 转化成固定格式数据流
DataStream<Tuple5<String, String, String, String, String>> sourceStreamTra = sourceStream
.map(算子,下节分析)
.name("source data prase")
.setParallelism(1);
//启动任务
env.execute("flink-score-job");
}
public static DataStream<String> readFromKafka(StreamExecutionEnvironment env,Properties kafkaProps) {
//通过Kafka主题获取数据
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>("score-topic", new SimpleStringSchema(), kafkaProps))
.name("kafka-source")
.setParallelism(1);
return stream;
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)