Flink消费Kafka

   点关注不迷路,欢迎再访!		

一.Flink Source&Sink
在 Flink 中,Source 代表从外部获取数据源,Transformation 代表了对数据进行转换操作,Sink 代表将内部数据写到外部数源。
一个 Flink Job 一般由 Source, Transformation, Sink 组成。

二.引入kafka依赖

		<!-- kafka -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version>${kafka.version}</version>
		</dependency>
		<!-- /kafka -->

		<!-- flink -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		
		<dependency>
			  <groupId>org.apache.flink</groupId>
			  <artifactId>flink-core</artifactId>
			  <version>${flink.version}</version>
		</dependency>
		<dependency>
			  <groupId>org.apache.flink</groupId>
			  <artifactId>flink-queryable-state-client-java_2.11</artifactId>
			  <version>${flink.version}</version>
		</dependency>		
		<!-- /flink -->

三.连接Flink配置类

import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
 * 
 * <p>
 * description:Flink属性相关配置
 * </p>
 * 
 * @author sunqi
 * @since 2019年8月5日
 * @see
 */
@Configuration
@EnableTransactionManagement
public class FlinkConfig {
	Log logger = LogFactory.getLog(this.getClass());

	@Value("${bootstrap.servers}")
	String bootstrapServers;

	@Value("${group.id}")
	String groupId;

	@Bean(name = "kafkaProps")
	public Properties kafkaProps() {
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", bootstrapServers);
		props.setProperty("group.id", groupId);
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("max.partition.fetch.bytes", "8388608");// 设置分区消费消息最大size 8M
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		return props;
	}
}

注意事项:

setStartFromGroupOffsets,如果 group offset 不存在,或者 group offset 无效的话,将依据 “auto.offset.reset” 该属性来决定初始 offset。auto.offset.reset 默认为 largest。
setStartFromSpecificOffsets,如果指定 offset 无效时,则将该 topic partition 的 offset 将设置为 group offset。
如果该作业是从 checkpoint 或 savepoint 中恢复,则所有设置初始 offset 的函数均将失效,初始 offset 将从 checkpoint 中恢复。

四.完整案例分析


/**
 * @author sunqi
 *
 */
@Component
public class KafkaFlinkJob implements ApplicationRunner {

    @Autowired
    private Properties kafkaProps;
    
    @SuppressWarnings("all")
    @Override
    public void run(ApplicationArguments args) throws Exception {
    	//创建一个流运行环境
    	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    	//设置检查点时间间隔
    	env.enableCheckpointing(6000);
    	//fs状态后端配置,如为file:///,则在taskmanager的本地
        env.setStateBackend( new FsStateBackend("file:///opt/tpapp/flinkdata", true ));
        //确保一次语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //退出不删除checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        //指定checkpoint执行的超时时间为60秒
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //指定运行中的checkpoint最多数量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //运行环境的默认并发数为1
        env.setParallelism(1);
        
        
        //数据源
        DataStream<String> sourceStream = readFromKafka(env,kafkaProps);
        //解析kafka数据流 转化成固定格式数据流
        DataStream<Tuple5<String, String, String, String, String>> sourceStreamTra = sourceStream
		        .map(算子,下节分析)
		        .name("source data prase")
		        .setParallelism(1);
        
        //启动任务
        env.execute("flink-score-job");

    }
    


	public static DataStream<String> readFromKafka(StreamExecutionEnvironment env,Properties kafkaProps) {
		//通过Kafka主题获取数据
        DataStream<String> stream = env
        		.addSource(new FlinkKafkaConsumer09<>("score-topic", new SimpleStringSchema(), kafkaProps))
                .name("kafka-source")
                .setParallelism(1);
        
        return stream;
    }
    
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐