基于flink1.7+scala+kafka

直接上代码吧

第一个是自定义的一个ProcessFunction 继承KeyedProcessFunction

class SensorCountTimeOutProcess extends KeyedProcessFunction[Tuple,(GroupInfo,String, String, String),(GroupInfo,String, String, String)]{

  var state:ValueState[(GroupInfo,String, String, String)] = _

  var count:ValueState[Long] = _

  override def processElement(value: (GroupInfo, String, String, String), ctx: KeyedProcessFunction[Tuple, (GroupInfo, String, String, String), (GroupInfo, String, String, String)]#Context, out: Collector[(GroupInfo, String, String, String)]): Unit = {

    var input :(GroupInfo, String, String, String) = null
    if(state.value() == null){
      count.update(1)
      input = value
//      val time =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(input._3).getTime
      val time = System.currentTimeMillis()+25000
      ctx.timerService().registerEventTimeTimer(time)
      println("----定时时间为25s-----")
      println("-----当前的定时timestamp为:"+ time)
    }else{
      input = state.value()
      count.update(count.value()+1)
      input = (input._1,input._2,input._3,input._4+value._4)
    }

    state.update(input)

    if(count.value()>=input._1.size.length){
      println("---------个数达到 输出state了 -------------")
      count.clear()
      out.collect(input)

    }


/*    val time =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(input._3).getTime

    ctx.timerService().registerEventTimeTimer(time+2000)*/
  }


  override def open(parameters: Configuration): Unit = {
     state = getRuntimeContext
      .getState[(GroupInfo,String,String,String)](
      new ValueStateDescriptor[(GroupInfo, String, String, String)]("sensorState",classOf[(GroupInfo, String, String, String)]))

    count = getRuntimeContext.getState[Long](
      new ValueStateDescriptor[Long]("countNum",classOf[(Long)])
    )
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (GroupInfo, String, String, String), (GroupInfo, String, String, String)]#OnTimerContext, out: Collector[(GroupInfo, String, String, String)]): Unit = {
      println("-----时间到,执行定时任务了-----")
      println("-----此时的timestamp为:" +timestamp)
      out.collect(state.value())


  }

第二个是主流程处理类


  def main(args: Array[String]): Unit = {
    //创建kafka的Source
    import org.apache.flink.api.scala._
    val properties:Properties = new Properties();
    properties.setProperty("bootstrap.servers","填自己的kafka集群就行")
    properties.setProperty("group.id","pk-test-log")
    val topic = "pktest"

    val kafkaSource = new FlinkKafkaConsumer[String](topic,
      new SimpleStringSchema(),properties);

    //创建stream环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val data = env.addSource(kafkaSource).setParallelism(1)

    val res = data.map(x=>{
      //{"gId":"34733770-f66c-443b-b96e-d0a6691860e3","size":[1,2,3,4,5,6,7],"tId":1}
      // 1
      // 	2019-10-15 11:12:00
      // B
      // 	aaaaaaaaaaaaaaaaaaa
      val strs = x.split("\t")
      val groupInfo:GroupInfo = JsonHelper.Json2Object[GroupInfo](strs(0))._1.get
      val key = groupInfo.gId +""+ groupInfo.tId


      (groupInfo,key,strs(2),strs(4))
    })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(GroupInfo,String, String, String)] {
      val maxOutOfOrderness = 3500L // 3.5 seconds

      var currentMaxTimestamp: Long = 0

      override def getCurrentWatermark: Watermark = {
        new Watermark( System.currentTimeMillis() - maxOutOfOrderness )
      }

      override def extractTimestamp(element: (GroupInfo,String, String, String), previousElementTimestamp: Long): Long = {
        val time = element._3
        val timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(time).getTime
//        currentMaxTimestamp = timestamp
        return timestamp
      }
    })
    .keyBy(1)
    .process(new SensorCountTimeOutProcess).print()

    env.execute("DataAnalysisApp")


  }

有点需要注意的地方就是
水印的模块 我之前使用的是currentMaxTimestamp - maxOutOfOrderness
但是随着程序的运行 并不会更新水印,这会导致我processfunction里面的onTimer方法根本无法触发
OnTimer方法是基于注册的时间戳,当系统的水印超过这个时间戳的时候可以触发定时方法

下面给出运行结果

----定时时间为25s-----
-----当前的定时timestamp为:1571206010805
-----时间到,执行定时任务了-----
-----此时的timestamp为:1571206010805
2> (GroupInfo(9b8144da-f42a-461d-b71e-aa6aeb867782,[I@595eb4e2,3),9b8144da-f42a-461d-b71e-aa6aeb8677823,2019-10-16 14:06:16,cccccccccccccccccccccccc)


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐