FlinkSQL读取kafka些消息,并且对消息计数实现
import com.alibaba.fastjson.JSONimport net.icsoc.report.CtiReportRealTime.Messageimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.typeinfo.Type...
·
import com.alibaba.fastjson.JSON
import net.icsoc.report.CtiReportRealTime.Message
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
/** *****************************************************************************
* 版权信息:
*
* @author xuchang
* Copyright: Copyright (c) 2007北京中通天鸿武汉分公司,Inc.All Rights Reserved.
* Description:
******************************************************************************/
object CtiReportRealTime {
def main(args: Array[String]): Unit = {
var properties: String = "stream-dev.properties"
if (args.length > 0 && ("prod" eq args(0))) {
properties = "stream-prod.properties"
}
val parameterTool = ParameterTool.fromPropertiesFile(CtiReportRealTime.getClass.getClassLoader.getResourceAsStream("stream-dev.properties"))
// 获取flink运行环境对象
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 设置sys.out 打印功能失效
env.getConfig.enableSysoutLogging()
//env.setStateBackend(new FsStateBackend("file:///tmp/flink-checkpoints"))
// 设置flink重启策略
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1000))
//设置每5s一个checkpoint
env.enableCheckpointing(50000)
// 设置参数全局可用
env.getConfig.setGlobalJobParameters(parameterTool)
// 设置时间特性为eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置配置为全局
env.getConfig.setGlobalJobParameters(parameterTool)
val serverNumGroupCache: java.util.Map[String, java.util.Map[String, Object]] =
new java.util.HashMap[String, java.util.Map[String, Object]]()
val topics: java.util.List[String] =
new java.util.ArrayList[String]()
// 中间件数据
topics.add(parameterTool.getRequired("input-topic"))
// 临时数据
topics.add(parameterTool.getRequired("tmp-input-topic"))
// 从kafka中读取数据流实现
val inputStream: DataStream[Message] = env.addSource(
new FlinkKafkaConsumer010[Message](
topics,
new SimpleMessageSchema,
parameterTool.getProperties
)).name("source")
val table = tEnv.fromDataStream(inputStream, 'mainType, 'extType)
val result = tEnv.sqlQuery(s"SELECT mainType,extType,COUNT(1) from $table GROUP BY mainType,extType")
result.toRetractStream[MessageCount].print()
env.execute("cti report real time stream")
}
case class Message(mainType: String, extType: String, amount: Int)
case class MessageCount(mainType: String, extType: String, amount: Long)
}
class SimpleMessageSchema extends DeserializationSchema[Message] {
override def isEndOfStream(t: Message): Boolean = {
return false
}
override def deserialize(bytes: Array[Byte]): Message = {
return JSON.parseObject(new String(bytes), classOf[Message])
}
override def getProducedType: TypeInformation[Message] = {
return TypeInformation.of(classOf[Message])
}
}
更多推荐
已为社区贡献9条内容
所有评论(0)