spark streaming + hbase
实时项目,从kafka拉取数据,存到hbase中环境(maven):dependency>groupId>org.scala-langgroupId>artifactId>scala-libraryartifactId>version>2.10.4version>dependency>dependency>groupId>org.apach
·
实时项目,从kafka拉取数据,存到hbase中
环境(maven):
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.3</version> </dependency>
代码如下:
import java.util.Date import com.mongodb.{MongoClient, MongoClientOptions, DB} import com.mongodb.MongoClientOptions.Builder import com.mongodb.casbah.commons.{MongoDBObject, Imports} import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} import org.apache.hadoop.hbase.client.{Put, ConnectionFactory} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2016/9/19. */ object KafkaAndHbase { val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])]) =>{ iter.flatMap{case (x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))} } def main(args: Array[String]) { LoggerLevels.setStreamingLogLevels() //转入参数 ,host,dbname,collection //zkQurnum,group,topic,numThreads,cachepath //shizhanmini:2181,mini02:2181,mini03:2181 ty90 db100 2 c://ck2 var Array(zkQurnum,group,topic,numThreads,cachepath)=args //创建sparkconf 测试本地模式 val sparkConf= new SparkConf().setAppName("KafkaAndHbase") .setMaster("local[2]") //创建流式处理对象 val ssc= new StreamingContext(sparkConf,Seconds(3)) ssc.checkpoint(cachepath) //将topic转为map val topicMap= topic.split(",").map((_,numThreads.toInt)).toMap //根据参数,创建Stream val lines= KafkaUtils.createStream(ssc,zkQurnum,group,topicMap,StorageLevel.MEMORY_AND_DISK).map(_._2) val users = lines.map(x => (x.split(",")(1),x.split(",")(3),x.split(",")(6),x.split(",")(7))) //初始化mongo //val db= getMongo(host,dbname) users.print() users.foreachRDD(rdd => { var counts:Int=0 rdd.foreachPartition(par =>{ val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "192.168.199.6,192.168.199.7,192.168.199.8") //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口 val conn = ConnectionFactory.createConnection(conf) //从Connection获得 Admin 对象(相当于以前的 HAdmin) val admin = conn.getAdmin //本例将操作的表名 val userTable = TableName.valueOf("user") //获取 user 表 val table = conn.getTable(userTable) if(!par.isEmpty) { par.foreach(pair => { val id= pair._1+"_"+new Date().getTime val vname = pair._2 val lat=pair._3 val lon=pair._4 println(id+"-----------geo-----------"+lon) //准备插入一条 key 为 id001 的数据 val p = new Put(id.getBytes) //为put操作指定 column 和 value p.addColumn("info1".getBytes,"lat".getBytes,lat.getBytes) p.addColumn("info1".getBytes,"lon".getBytes,lon.getBytes) //提交 table.put(p) }) } conn.close() }) }) ssc.start() ssc.awaitTermination() } }
更多推荐
已为社区贡献6条内容
所有评论(0)