Flink Kafka读取数据并写入Redis实现Exactly-Once
pom.xml<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.See the NOTICE filedistributed with this work for additional informationregarding copyrig
·
博客《2020大数据面试真题(持续更新)》已迁移至微信公众号!!! 喜欢的朋友可以关注
最新面试题将会在微信公众号更新!
关注公众号即可获得免费学习资源,获得免费指导!!!
pom.xml
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.quickstart</groupId>
<artifactId>quickstart</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.1</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!-- 高效的异步HttpClient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
<!-- -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.myorg.quickstart.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
flink-config.propertites (此配置文件最好不要放在工程里面,可以单独放在服务器目录,方便修改)
topics=test group.id=lzc bootstrap.servers=bigdata1:9092,bigdata2:9092 auto.offset.reset=earliest enable.auto.commit=false checkpoint.interval=10000 redis.host=localhost redis.pwd=123456 redis.db=0
FlinkKafkaToRedis
package org.myorg.quickstart.kafka;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;
public class FlinkKafkaToRedis {
public static void main(String[] args) throws Exception{
//传入配置文件路径即可
ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> lines = FlinkUtil.createKafkaStream(parameters, SimpleStringSchema.class);
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String words, Collector<String> collector) throws Exception {
for (String word : words.split(" ")) {
collector.collect(word);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> word = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});
//为了保证程序出现问题可以继续累加
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = word.keyBy(0).sum(1);
sum.map(new MapFunction<Tuple2<String, Integer>, Tuple3<String,String,String>>() {
@Override
public Tuple3<String, String, String> map(Tuple2<String, Integer> tp) throws Exception {
return Tuple3.of("word_count",tp.f0,tp.f1.toString());
}
}).addSink(new MyRedisSink());
FlinkUtil.getEnv().execute("kafkaSource");
}
}
自定义工具类FlinkUtil
package org.myorg.quickstart.kafka;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class FlinkUtil {
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* createKafkaStream
* @param parameters
* @param clazz
* @param <T>
* @return
* @throws Exception
*/
public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception{
env.getConfig().setGlobalJobParameters(parameters);
//本地环境读取hdfs需要设置,集群上不需要
//System.setProperty("HADOOP_USER_NAME","root");
//默认情况下,检查点被禁用。要启用检查点
env.enableCheckpointing(parameters.getLong("checkpoint.interval",5000L),CheckpointingMode.EXACTLY_ONCE);
//设置重启策略 默认不停重启
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
//设置state存储的后端(建议在flink配置文件里面配置)
//env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
//程序异常退出或人为cancel掉,不删除checkpoint数据(默认是会删除)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameters.getRequired("bootstrap.servers"));
properties.setProperty("group.id", parameters.getRequired("group.id"));
//如果没有记录偏移量 第一次从最开始消费
properties.setProperty("auto.offset.reset",parameters.get("auto.offset.reset","earliest"));
//kafka的消费者不自动提交偏移量 而是交给flink通过checkpoint管理
properties.setProperty("enable.auto.commit",parameters.get("enable.auto.commit","false"));
String topics = parameters.getRequired("topics");
List<String> topicList = Arrays.asList(topics.split(","));
//Source : Kafka, 从Kafka中读取数据
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
topicList,
clazz.newInstance(),
properties);
//flink checkpoint成功后还要向kafka特殊的topic中写入偏移量 默认是true
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
return env.addSource(kafkaConsumer);
}
/**
* 获取执行环境
* @return
*/
public static StreamExecutionEnvironment getEnv(){
return env;
}
}
自定义redisSink(MyRedisSink)
package org.myorg.quickstart.kafka;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
public class MyRedisSink extends RichSinkFunction<Tuple3<String,String,String>> {
private transient Jedis jedis;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//获取全局参数
ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String host = params.getRequired("redis.host");
String pwd = params.getRequired("redis.pwd");
int db = params.getInt("redis.db", 0);
jedis = new Jedis(host, 6379, 5000);
jedis.auth(pwd);
jedis.select(db);
}
@Override
public void invoke(Tuple3<String, String, String> value, Context context) throws Exception {
if (!jedis.isConnected()){
jedis.connect();
}
jedis.hset(value.f0,value.f1,value.f2);
}
@Override
public void close() throws Exception {
super.close();
jedis.close();
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)