Flink+kafka+redis实时计算wordcount
原文地址 https://blog.csdn.net/xianpanjia4616/article/details/82534369上一篇中我们在本地跑了一个wordcount,今天我们写一个流式的计算wordcount,读取kafka的数据进行实时的计算,把结果写入redis中;pom文件如下:<dependency> <group
原文地址 https://blog.csdn.net/xianpanjia4616/article/details/82534369
上一篇中我们在本地跑了一个wordcount,今天我们写一个流式的计算wordcount,读取kafka的数据进行实时的计算,把结果写入redis中;
pom文件如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
</dependency>
代码如下:
package flink
import java.util.Properties
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.{SimpleStringSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.log4j.Logger
/**
* flinkstreaming消费kafka的数据实现exactly-once的语义;
*/
object flinkStreaming {
private val zk = "xxx"
private val broker = "xxx"
private val group_id = "xxx"
private val topic = "xxx"
def main(args: Array[String]): Unit = {
lazy val logger = Logger.getLogger(flinkStreaming.getClass)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("zookeeper.connect", zk)
properties.setProperty("bootstrap.servers", broker)
properties.setProperty("group.id", group_id)
val consumer = new FlinkKafkaConsumer010[String](topic,new SimpleStringSchema, properties)
val stream = env.addSource(consumer)
val wordcount = stream
.flatMap(_.split(" "))
.filter(x=> x!= null)
.map((_,1))
.keyBy(0)
.sum(1)
.setParallelism(2)
.map(x=> (x._1,x._2.toString))
val conf = new FlinkJedisPoolConfig.Builder().setHost("xxx").setPort(xxx).build()
val sink = new RedisSink[(String,String)](conf, new RedisExampleMapper)
wordcount.addSink(sink)
env.execute("flink streaming")
}
}
class RedisExampleMapper extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SET, null)
}
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
从最基本的水平上看,一个Flink程序由下面几部分组成:
数据源:Flink处理的输入的数据。
转化:Flink对数据进行处理的步骤。
接收器:Flink将处理之后的数据发送的地点。
Sources是你的程序读取输入的地方。你可以通过StreamExecutionEnvironment.addSource(sourceFunction)将Source添加到你的程序中。Flink提供了若干已经实现好了的source functions,当然你也可以通过实现SourceFunction来自定义非并行的source或者实现ParallelSourceFunction接口或者扩展RichParallelSourceFunction来自定义并行的source.
数据sinks消费DataStream并将其发往文件、socket、外部系统或进行打印。Flink自带多种内置的输出格式,这些都被封装在对DataStream的操作背后.
我们需要实现一个RedisSink,然后数据就可以直接写入到redis.支持以下几种数据类型:
然后打包上传到服务器: ./flink run -c flink.flinkStreaming /home/jason/bigdata/jar/bj_parse_json-1.0-SNAPSHOT.jar 提交到集群
然后我们就可以在webUI里面看到我们的job,如下:
从上图中可以看到正在运行的job,和已经完成的job,还有运行的状态等信息,然后我们点金正在运行的进去后,可以看到:
中间的是一个DAG图,可以看到数据从source出来后经过了哪些transformation算子,最后输出到sink的一个过程.下面可以看到有source和sink他们发送了多少条记录,接受了多少条记录等信息.
如果想要停止正在运行的job,第一种方式,可以直接点击右上角的cancel就能把job停掉,第二种,可以通过命令行,flink list先查询有哪些正在运行的job,然后flink cancel jobid就可以,今天就先介绍到这里,困了,睡觉了
更多推荐
所有评论(0)