原文地址 https://blog.csdn.net/xianpanjia4616/article/details/82534369

上一篇中我们在本地跑了一个wordcount,今天我们写一个流式的计算wordcount,读取kafka的数据进行实时的计算,把结果写入redis中;

pom文件如下:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.10</artifactId>
            <version>1.1.5</version>
        </dependency>


代码如下:

package flink
 
import java.util.Properties
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.{SimpleStringSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.log4j.Logger
/**
  * flinkstreaming消费kafka的数据实现exactly-once的语义;
  */
object flinkStreaming {
  private val zk = "xxx"
  private val broker = "xxx"
  private val group_id = "xxx"
  private val topic = "xxx"
  def main(args: Array[String]): Unit = {
    lazy val logger = Logger.getLogger(flinkStreaming.getClass)
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    val properties = new Properties()
    properties.setProperty("zookeeper.connect", zk)
    properties.setProperty("bootstrap.servers", broker)
    properties.setProperty("group.id", group_id)
    val consumer = new FlinkKafkaConsumer010[String](topic,new SimpleStringSchema, properties)
    val stream = env.addSource(consumer)
    val wordcount = stream
      .flatMap(_.split(" "))
      .filter(x=> x!= null)
      .map((_,1))
      .keyBy(0)
      .sum(1)
      .setParallelism(2)
      .map(x=> (x._1,x._2.toString))
    val conf = new FlinkJedisPoolConfig.Builder().setHost("xxx").setPort(xxx).build()
    val sink = new RedisSink[(String,String)](conf, new RedisExampleMapper)
    wordcount.addSink(sink)
    env.execute("flink streaming")
  }
}
class RedisExampleMapper extends RedisMapper[(String, String)] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET, null)
  }
  override def getKeyFromData(data: (String, String)): String = data._1
  override def getValueFromData(data: (String, String)): String = data._2
}


 
从最基本的水平上看,一个Flink程序由下面几部分组成:

数据源:Flink处理的输入的数据。
转化:Flink对数据进行处理的步骤。
接收器:Flink将处理之后的数据发送的地点。

Sources是你的程序读取输入的地方。你可以通过StreamExecutionEnvironment.addSource(sourceFunction)将Source添加到你的程序中。Flink提供了若干已经实现好了的source functions,当然你也可以通过实现SourceFunction来自定义非并行的source或者实现ParallelSourceFunction接口或者扩展RichParallelSourceFunction来自定义并行的source.

数据sinks消费DataStream并将其发往文件、socket、外部系统或进行打印。Flink自带多种内置的输出格式,这些都被封装在对DataStream的操作背后.

我们需要实现一个RedisSink,然后数据就可以直接写入到redis.支持以下几种数据类型:

然后打包上传到服务器: ./flink run -c flink.flinkStreaming /home/jason/bigdata/jar/bj_parse_json-1.0-SNAPSHOT.jar 提交到集群

然后我们就可以在webUI里面看到我们的job,如下:

从上图中可以看到正在运行的job,和已经完成的job,还有运行的状态等信息,然后我们点金正在运行的进去后,可以看到:

中间的是一个DAG图,可以看到数据从source出来后经过了哪些transformation算子,最后输出到sink的一个过程.下面可以看到有source和sink他们发送了多少条记录,接受了多少条记录等信息.

如果想要停止正在运行的job,第一种方式,可以直接点击右上角的cancel就能把job停掉,第二种,可以通过命令行,flink list先查询有哪些正在运行的job,然后flink cancel jobid就可以,今天就先介绍到这里,困了,睡觉了

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐