从kafka里面取数据到sparkStreaming里面,然后再把数据保存到数据中
关键点,不是每个数据都需要创建连接,只需要为每个分区创建一个连接就可以了
下面是一个简单的例子

import java.sql.{ Connection, DriverManager }

import com.oracle._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{ SparkConf, SparkContext }

/**
 * 从kafka上面读取数据,然后保存到数据库上面,虽然一般不建议把数据保存到数据库中,(保存到数据库中的时候可以建立连接池)
 * 如果是保存到hbase上面也可以使用这样的方法,为每个分区创建一个RDD连接,而不是为每个数据建立一个连接
 */
object StreamToOracle {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))
    //ssc.checkpoint("checkpoint")
    val topic = "test"
    val topicMap = topic.split(",").map((_, 1)).toMap
    val lines = KafkaUtils.createStream(ssc, "192.168.10.209:2181,192.168.10.219:2181,192.168.10.199:2181", "ssk", topicMap).map(_._2)
    //每个RDD进行操作
    lines.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        //重点在这里,在每一个分区里面建立一个连接
        val connection = getConnection()
        partitionOfRecords.foreach(line => {
          val info = line.split(":")
          val ip = info(0)
          val mesType = info(1)
          val data = info(2)
          val timeStamp = info(3)
          val sql = "insert into MONITOR_DATA values(AUTO_INCREMENT.NEXTVAL,'" + ip + "','" + mesType + "','" + data + "'," + timeStamp + ")"
          saveToOracle(connection, sql)

        })
        closeConn(connection)
      })

      val words = rdd.flatMap(_.split(":"))

    })
    ssc.start()
    ssc.awaitTermination()
  }
  //保存数据
  def saveToOracle(con: Connection, sql: String): Int = {
    val ps = con.prepareStatement(sql);
    val res = ps.executeUpdate()
    ps.close()
    res
  }
  //关闭连接
  def closeConn(con: Connection) = {
    con.close()
  }
  //得到连接
  def getConnection(): Connection = {
    val url = "jdbc:oracle:thin:@//192.168.10.100:1521/UCLOUD"
    Class.forName("oracle.jdbc.driver.OracleDriver").newInstance();
    val con = DriverManager.getConnection(url, "scott", "scott");
    con
  }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐