常见的map.flatMap,filter类比spark

KeyBy

DataStream → KeyedStream:输入必须是Tuple类型(一般通过map转换),逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

//求各个渠道的累计个数
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
//reduce //sum
keyedStream.reduce{  (ch1,ch2)=>
  (ch1._1,ch1._2+ch2._2)
} .print().setParallelism(1)

类比可知,spark的reduceByKey == keyBy+Reduce

flink保存累计值原理

flink是一种有状态的流计算框架

  1. operator state : 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once

  2. keyed state : 主要保存数据在计算过程中的累计值

这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。

Split 和Select

split

在这里插入图片描述
DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

Select

在这里插入图片描述
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

需求:将kafka中数据根据某属性分割开,分成两个流
package kafka

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object ConsumerApp {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaConsumer = KafKaUtil.getConsumer("test")

    //import org.apache.flink.api.scala._ 这里要加入隐式转换
    val dstream = environment.addSource(kafkaConsumer)

//    dstream.print()
    val logStream = dstream.split {
      log =>
        var flags: List[String] = null
        if ("Apple".equals(log)) {
          flags = List(log)
        } else {
          flags = List("Android")
        }
        flags
    }
    val apple = logStream.select("Apple")

    apple.print("apple:").setParallelism(1)

    val android = logStream.select("Android")
    android.print("Android:").setParallelism(1)
    environment.execute()
  }
}

Connect和 CoMap

Connect

在这里插入图片描述
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap,CoFlatMap

这两个不是算子,只是类比 connect + map / flatMap
在这里插入图片描述
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

    val connStream = androidStream.connect(appleStream)
    val allStream = connStream.map(
      (log1: String) => log1 + "1",
      (log2: String) => log2 + "2"
    )
    allStream.print("连接流:")

处理完后类型要一致,并且符合返回的泛型

Union

在这里插入图片描述
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

    val unionStream = appleStream.union(androidStream)
    unionStream.print("union:")

    environment.execute()
Connect与 Union 区别

1 Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2 Connect只能操作两个流,Union可以操作多个

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐