基于kafka的sink

kafkaUtil中

  def getProducer(topic: String): FlinkKafkaProducer011[String] = {
    new FlinkKafkaProducer011[String]("note01:9092,note02:9092,note03:9092",topic,new SimpleStringSchema())
  }

具体apps代码中

   val kafkaSink = KafKaUtil.getProducer("time")

    unionStream.map(x => x+"sink").addSink(kafkaSink)

基于redis的sink

添加依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

redis工具类

package kafka

import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisUtils {
  val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setDatabase(2).setHost("note01").setPort(6379).build()

  def getRedisSink(): RedisSink[(String,String)] = {
    new RedisSink[(String,String)](conf,new MyRedisMapper)
  }
}

class MyRedisMapper extends RedisMapper[(String,String)] {
  //传入redis的命令
  override def getCommandDescription: RedisCommandDescription = {
    /*这里的hset需要传递三个值 hset  + key filter value
      key 是自己起的名字 count
    * */
    new RedisCommandDescription(RedisCommand.HSET,"count")
  }

  //filter
  override def getKeyFromData(t: (String, String)): String = t._1
  //value
  override def getValueFromData(t: (String, String)): String = t._2
}

主类中调用

  unionStream.map(x => ("key:"+x,"value"+x)).addSink(RedisUtils.getRedisSink())

基于es的sink

pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.7.0</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.3</version>
</dependency>

MyEsUtil

import java.util

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

object MyEsUtil {

  
  val httpHosts = new util.ArrayList[HttpHost]
  httpHosts.add(new HttpHost("hadoop1",9200,"http"))
   httpHosts.add(new HttpHost("hadoop2",9200,"http"))
   httpHosts.add(new HttpHost("hadoop3",9200,"http"))


  def  getElasticSearchSink(indexName:String):  ElasticsearchSink[String]  ={
    val esFunc = new ElasticsearchSinkFunction[String] {
      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        println("试图保存:"+element)
        val jsonObj: JSONObject = JSON.parseObject(element)
        val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
        indexer.add(indexRequest)
        println("保存1条")
      }
    }

    val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)

    //刷新前缓冲的最大动作量
    sinkBuilder.setBulkFlushMaxActions(10)
 

     sinkBuilder.build()
  }

}

main方法中调用

// 明细发送到es 中
 val esSink: ElasticsearchSink[String] = MyEsUtil.getElasticSearchSink("gmall0503_startup")


  dstream.addSink(esSink)

JDBC SINK

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.44</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>

添加MyJdbcSink

class MyJdbcSink(sql:String ) extends  RichSinkFunction[Array[Any]] {

  val driver="com.mysql.jdbc.Driver"

  val url="jdbc:mysql://hadoop2:3306/gmall2019?useSSL=false"

  val username="root"

  val password="123123"

  val maxActive="20"

  var connection:Connection=null;

  //创建连接
  override def open(parameters: Configuration): Unit = {
    val properties = new Properties()
    properties.put("driverClassName",driver)
    properties.put("url",url)
    properties.put("username",username)
    properties.put("password",password)
    properties.put("maxActive",maxActive)


    val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)
    connection = dataSource.getConnection()
  }

//反复调用
  override def invoke(values: Array[Any]): Unit = {
    val ps: PreparedStatement = connection.prepareStatement(sql )
    println(values.mkString(","))
    for (i <- 0 until values.length) {
      ps.setObject(i + 1, values(i))
    }
    ps.executeUpdate()


  }

  override def close(): Unit = {

    if(connection!=null){
      connection.close()
    }

  }

}

在main方法中增加,把明细保存到mysql中

val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}

val jdbcSink = new MyJdbcSink("insert into z_startup values(?,?,?,?,?)")
startUplogDstream.map(startuplog=>Array(startuplog.mid,startuplog.uid,startuplog.ch,startuplog.area,  startuplog.ts)).addSink(jdbcSink)
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐