import com.alibaba.fastjson.JSON
import net.icsoc.report.CtiReportRealTime.Message
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

/** *****************************************************************************
  * 版权信息:
  *
  * @author xuchang
  *         Copyright: Copyright (c) 2007北京中通天鸿武汉分公司,Inc.All Rights Reserved.
  *         Description:
  ******************************************************************************/
object CtiReportRealTime {
  def main(args: Array[String]): Unit = {
    var properties: String = "stream-dev.properties"
    if (args.length > 0 && ("prod" eq args(0))) {
      properties = "stream-prod.properties"
    }
    val parameterTool = ParameterTool.fromPropertiesFile(CtiReportRealTime.getClass.getClassLoader.getResourceAsStream("stream-dev.properties"))
    // 获取flink运行环境对象
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    // 设置sys.out 打印功能失效
    env.getConfig.enableSysoutLogging()
    //env.setStateBackend(new FsStateBackend("file:///tmp/flink-checkpoints"))
    // 设置flink重启策略
    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1000))
    //设置每5s一个checkpoint
    env.enableCheckpointing(50000)
    // 设置参数全局可用
    env.getConfig.setGlobalJobParameters(parameterTool)
    //  设置时间特性为eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //设置配置为全局
    env.getConfig.setGlobalJobParameters(parameterTool)
    val serverNumGroupCache: java.util.Map[String, java.util.Map[String, Object]] =
      new java.util.HashMap[String, java.util.Map[String, Object]]()
    val topics: java.util.List[String] =
      new java.util.ArrayList[String]()
    // 中间件数据
    topics.add(parameterTool.getRequired("input-topic"))
    // 临时数据
    topics.add(parameterTool.getRequired("tmp-input-topic"))
    // 从kafka中读取数据流实现
    val inputStream: DataStream[Message] = env.addSource(
      new FlinkKafkaConsumer010[Message](
        topics,
        new SimpleMessageSchema,
        parameterTool.getProperties
      )).name("source")
    val table = tEnv.fromDataStream(inputStream, 'mainType, 'extType)
    val result = tEnv.sqlQuery(s"SELECT mainType,extType,COUNT(1) from $table GROUP BY mainType,extType")
    result.toRetractStream[MessageCount].print()
    env.execute("cti report real time stream")
  }

  case class Message(mainType: String, extType: String, amount: Int)

  case class MessageCount(mainType: String, extType: String, amount: Long)

}

class SimpleMessageSchema extends DeserializationSchema[Message] {
  override def isEndOfStream(t: Message): Boolean = {
    return false
  }

  override def deserialize(bytes: Array[Byte]): Message = {
    return JSON.parseObject(new String(bytes), classOf[Message])
  }

  override def getProducedType: TypeInformation[Message] = {
    return TypeInformation.of(classOf[Message])
  }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐