实时项目,从kafka拉取数据,存到hbase中

环境(maven):

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.4</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.4</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.0.3</version>
</dependency>

代码如下:
import java.util.Date

import com.mongodb.{MongoClient, MongoClientOptions, DB}
import com.mongodb.MongoClientOptions.Builder
import com.mongodb.casbah.commons.{MongoDBObject, Imports}
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Administrator on 2016/9/19.
 */
object KafkaAndHbase {
  val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])]) =>{
    iter.flatMap{case (x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
  }


  def main(args: Array[String]) {
    LoggerLevels.setStreamingLogLevels()
    //转入参数 ,host,dbname,collection
    //zkQurnum,group,topic,numThreads,cachepath
    //shizhanmini:2181,mini02:2181,mini03:2181 ty90 db100 2 c://ck2
    var Array(zkQurnum,group,topic,numThreads,cachepath)=args
    //创建sparkconf  测试本地模式
    val sparkConf= new SparkConf().setAppName("KafkaAndHbase")
      .setMaster("local[2]")
    //创建流式处理对象
    val ssc= new StreamingContext(sparkConf,Seconds(3))
    ssc.checkpoint(cachepath)
    //将topic转为map
    val topicMap= topic.split(",").map((_,numThreads.toInt)).toMap

    //根据参数,创建Stream
    val lines= KafkaUtils.createStream(ssc,zkQurnum,group,topicMap,StorageLevel.MEMORY_AND_DISK).map(_._2)
    val users = lines.map(x => (x.split(",")(1),x.split(",")(3),x.split(",")(6),x.split(",")(7)))
    //初始化mongo
    //val db= getMongo(host,dbname)

    users.print()

    users.foreachRDD(rdd => {
      var counts:Int=0
      rdd.foreachPartition(par =>{
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.zookeeper.quorum", "192.168.199.6,192.168.199.7,192.168.199.8")
        //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
        val conn = ConnectionFactory.createConnection(conf)
        //从Connection获得 Admin 对象(相当于以前的 HAdmin)
        val admin = conn.getAdmin
        //本例将操作的表名
        val userTable = TableName.valueOf("user")
        //获取 user 表
        val table = conn.getTable(userTable)
   
        if(!par.isEmpty) {
          par.foreach(pair => {
            val id= pair._1+"_"+new Date().getTime
            val vname = pair._2
            val lat=pair._3
            val lon=pair._4
            println(id+"-----------geo-----------"+lon)

            //准备插入一条 key 为 id001 的数据
            val p = new Put(id.getBytes)
            //为put操作指定 column 和 value 
            p.addColumn("info1".getBytes,"lat".getBytes,lat.getBytes)
            p.addColumn("info1".getBytes,"lon".getBytes,lon.getBytes)
            //提交
            table.put(p)
          })

        }
        conn.close()
      })

    })

    ssc.start()
    ssc.awaitTermination()



  }
}


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐