(11) flink中的sink数据输出
文章目录基于kafka的sink基于redis的sink基于kafka的sinkkafkaUtil中def getProducer(topic: String): FlinkKafkaProducer011[String] = {new FlinkKafkaProducer011[String]("note01:9092,note02:9092,note03:9092",top...
·
基于kafka的sink
kafkaUtil中
def getProducer(topic: String): FlinkKafkaProducer011[String] = {
new FlinkKafkaProducer011[String]("note01:9092,note02:9092,note03:9092",topic,new SimpleStringSchema())
}
具体apps代码中
val kafkaSink = KafKaUtil.getProducer("time")
unionStream.map(x => x+"sink").addSink(kafkaSink)
基于redis的sink
添加依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
redis工具类
package kafka
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisUtils {
val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setDatabase(2).setHost("note01").setPort(6379).build()
def getRedisSink(): RedisSink[(String,String)] = {
new RedisSink[(String,String)](conf,new MyRedisMapper)
}
}
class MyRedisMapper extends RedisMapper[(String,String)] {
//传入redis的命令
override def getCommandDescription: RedisCommandDescription = {
/*这里的hset需要传递三个值 hset + key filter value
key 是自己起的名字 count
* */
new RedisCommandDescription(RedisCommand.HSET,"count")
}
//filter
override def getKeyFromData(t: (String, String)): String = t._1
//value
override def getValueFromData(t: (String, String)): String = t._2
}
主类中调用
unionStream.map(x => ("key:"+x,"value"+x)).addSink(RedisUtils.getRedisSink())
基于es的sink
pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
MyEsUtil
import java.util
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
object MyEsUtil {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("hadoop1",9200,"http"))
httpHosts.add(new HttpHost("hadoop2",9200,"http"))
httpHosts.add(new HttpHost("hadoop3",9200,"http"))
def getElasticSearchSink(indexName:String): ElasticsearchSink[String] ={
val esFunc = new ElasticsearchSinkFunction[String] {
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("试图保存:"+element)
val jsonObj: JSONObject = JSON.parseObject(element)
val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
indexer.add(indexRequest)
println("保存1条")
}
}
val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)
//刷新前缓冲的最大动作量
sinkBuilder.setBulkFlushMaxActions(10)
sinkBuilder.build()
}
}
main方法中调用
// 明细发送到es 中
val esSink: ElasticsearchSink[String] = MyEsUtil.getElasticSearchSink("gmall0503_startup")
dstream.addSink(esSink)
JDBC SINK
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
添加MyJdbcSink
class MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] {
val driver="com.mysql.jdbc.Driver"
val url="jdbc:mysql://hadoop2:3306/gmall2019?useSSL=false"
val username="root"
val password="123123"
val maxActive="20"
var connection:Connection=null;
//创建连接
override def open(parameters: Configuration): Unit = {
val properties = new Properties()
properties.put("driverClassName",driver)
properties.put("url",url)
properties.put("username",username)
properties.put("password",password)
properties.put("maxActive",maxActive)
val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)
connection = dataSource.getConnection()
}
//反复调用
override def invoke(values: Array[Any]): Unit = {
val ps: PreparedStatement = connection.prepareStatement(sql )
println(values.mkString(","))
for (i <- 0 until values.length) {
ps.setObject(i + 1, values(i))
}
ps.executeUpdate()
}
override def close(): Unit = {
if(connection!=null){
connection.close()
}
}
}
在main方法中增加,把明细保存到mysql中
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val jdbcSink = new MyJdbcSink("insert into z_startup values(?,?,?,?,?)")
startUplogDstream.map(startuplog=>Array(startuplog.mid,startuplog.uid,startuplog.ch,startuplog.area, startuplog.ts)).addSink(jdbcSink)
更多推荐
已为社区贡献8条内容
所有评论(0)