SparkStreaming读kafka写入HDFS

本教程基于CDH5.8.0其它组件版本为:spark2.1.0、kafka0.10.2、HDFS2.6.0

pom

  <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
        <scope>compile</scope>
  </dependency>
  <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
        <scope>compile</scope>
  </dependency>
  <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
       <version>1.2.0</version>
       <scope>compile</scope>
  </dependency>

访问Kerberos环境下的HBase代码

package utils
import java.security.PrivilegedAction
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.security.UserGroupInformation

/**
  * Created by LHX on 2018/7/12 15:13.
  * 访问Kerberos环境下的HBase
  */

object HBaseUtil extends Serializable{

    /**
      * HBase 配置文件路径
      * @return
      */
    def getHBaseConn(): Connection = {
        System.setProperty("java.security.krb5.conf","krb5.conf")
        var table_name ="DAC_test01"
        val conf = HBaseConfiguration.create()
        val user = "asmp"
        val keyPath = "asmp.keytab"
        //Kerberos验证
        conf.set("hbase.zookeeper.quorum","www.test.com")
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.rootdir", "hdfs://www.test.com:8020/hbase")
        conf.set("hadoop.security.authentication","Kerberos")
        conf.set("hbase.security.authentication","Kerberos")
        conf.set("hbase.zookeeper.client.keytab.file", keyPath)
        conf.set("hbase.master.kerberos.principal", "hbase/_HOST")
        conf.set("hbase.master.keytab.file", keyPath)
        conf.set("hbase.regionserver.kerberos.principal", "hbase")
        conf.set("hbase.regionserver.keytab.file", keyPath)

        UserGroupInformation.setConfiguration(conf)
        UserGroupInformation.loginUserFromKeytab(user, keyPath)
        val loginUser = UserGroupInformation.getLoginUser
        loginUser.doAs(new PrivilegedAction[Connection] {
            override def run(): Connection = ConnectionFactory.createConnection(conf)
        })
    }

}

Spark2Streaming应用实时读取Kafka代码

package com.egridcloud.kafka

import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.HBaseUtil

import scala.util.Try
import scala.util.parsing.json.JSON

/**
  * Created by LHX on 2018/7/12 15:07.
  * describe: Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入HBase
  * 使用spark2-submit的方式提交作业
  */
object Kafka2Spark2HbaseTest {

    def main(args: Array[String]): Unit = {
System.setProperty("java.security.krb5.conf","krb5.conf")
        var table_name ="DAC_test01"
        val conf = HBaseConfiguration.create()
        val user = "asmp"
        val keyPath = "asmp.keytab"
        //Kerberos验证
        conf.set("hbase.zookeeper.quorum","www.test.com")
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.rootdir", "hdfs://www.test.com:8020/hbase")
        conf.set("hadoop.security.authentication","Kerberos")
        conf.set("hbase.security.authentication","Kerberos")
        conf.set("hbase.zookeeper.client.keytab.file", keyPath)
        conf.set("hbase.master.kerberos.principal", "hbase/_HOST")
        conf.set("hbase.master.keytab.file", keyPath)
        conf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST")
        conf.set("hbase.regionserver.keytab.file", keyPath)

        UserGroupInformation.setConfiguration(conf)
        UserGroupInformation.loginUserFromKeytab(user, keyPath)

        //加载配置文件
        val brokers = "10.122.17.129:9095,10.122.17.130:9095,10.122.17.131:9095"
        val topics = "stream_test01"
        println("kafka.brokers:" + brokers)
        println("kafka.topics:" + topics)

        if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) ) {
            println("未配置Kafka和Kerberos信息")
            System.exit(0)
        }
        val topicsSet = topics.split(",").toSet

        val sparkConf = new SparkConf().setAppName("Kafka2Spark2HBase").setMaster("local[2]")
        val sc =new SparkContext(sparkConf)
        //    批次间隔5秒
        val ssc = new StreamingContext(sc, Seconds(5))
        //        val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
        val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
            , "auto.offset.reset" -> "latest"
            , "sasl.kerberos.service.name" -> "kafka"
            , "key.deserializer" -> classOf[StringDeserializer]
            , "value.deserializer" -> classOf[StringDeserializer]
            , "group.id" -> "testgroup"
        )

        val dStream = KafkaUtils.createDirectStream[String, String](ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

        dStream.foreachRDD(rdd => {
            rdd.foreachPartition(partitionRecords => {
                val connection = HBaseUtil.getHBaseConn() // 获取Hbase连接
                partitionRecords.foreach(line => {
                    //将Kafka的每一条消息解析为JSON格式数据
                    val jsonObj =  JSON.parseFull(line.value())
                    println(line.value())
                    val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
                    val rowkey = map.get("id").get.asInstanceOf[String]
                    val name = map.get("name").get.asInstanceOf[String]
                    val sex = map.get("sex").get.asInstanceOf[String]
                    val city = map.get("city").get.asInstanceOf[String]
                    val occupation = map.get("occupation").get.asInstanceOf[String]
                    val mobile_phone_num = map.get("mobile_phone_num").get.asInstanceOf[String]
                    val fix_phone_num = map.get("fix_phone_num").get.asInstanceOf[String]
                    val bank_name = map.get("bank_name").get.asInstanceOf[String]
                    val address = map.get("address").get.asInstanceOf[String]
                    val marriage = map.get("marriage").get.asInstanceOf[String]
                    val child_num = map.get("child_num").get.asInstanceOf[String]

                    val tableName = TableName.valueOf("ASMP:DAC_test01")
                    val table = connection.getTable(tableName)
                    val put = new Put(Bytes.toBytes(rowkey))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(name))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sex"), Bytes.toBytes(sex))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("city"), Bytes.toBytes(city))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("occupation"), Bytes.toBytes(occupation))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("mobile_phone_num"), Bytes.toBytes(mobile_phone_num))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("fix_phone_num"), Bytes.toBytes(fix_phone_num))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("bank_name"), Bytes.toBytes(bank_name))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("address"), Bytes.toBytes(address))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("marriage"), Bytes.toBytes(marriage))
                    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("child_num"), Bytes.toBytes(child_num))

                    Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
                    table.close()//分区数据写入HBase后关闭连接
                })
                connection.close()
            })
        })
        ssc.start()
        ssc.awaitTermination()
    }

}

写入数据到kafka代码

package com.##.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.parsing.json._
/**
  * Created by LHX on 2018/7/10 上午 11:00.
  * 测试集群kafka发送Json数据
  */
class TestProducer(val topic: String) extends Thread {
    var producer: KafkaProducer[String, String] = _

    def init: TestProducer = {
        val props = new Properties()
        props.put("bootstrap.servers", "10.122.17.129:9095,10.122.17.130:9095,10.122.17.131:9095")
        props.put("key.serializer", classOf[StringSerializer].getName)
        props.put("value.serializer", classOf[StringSerializer].getName)
        this.producer = new KafkaProducer[String, String](props)
        this
    }
    override def run(): Unit = {

        var num = 1
        while (true) {
            val colors:Map[String,Object]  = Map("id" -> s"65005${num}", "name" -> s"仲淑${num}", "sex" -> "1",
                "city" -> "长治", "occupation" -> "生产工作", "mobile_phone_num" -> "13607268580", "fix_phone_num" -> "15004170180",
                "bank_name" -> "广州银行", "address" -> "台东二路16号", "marriage" -> "1", "child_num" -> "1")
            val json = JSONObject(colors)
            //要发送的消息
            val messageStr = new String(json.toString())
            println(s"send:${messageStr}")
            producer.send(new ProducerRecord[String, String](this.topic, messageStr))
            num += 1
            if (num > 10) num = 0
            Thread.sleep(3000)
        }
    }
}
// 伴生对象
object TestProducer 
    def apply(topic: String): TestProducer = new TestProducer(topic).init
}

object SparkWrite2Kafka {
    def main(args: Array[String]): Unit = {
        val producer = TestProducer("stream_test01")
        producer.start()
    }
}

注意事项:
先执行Kafka2Spark2HbaseTest.scala接收kafka数据,
再执行SparkWrite2Kafka.scala写入数据到kafka

 写入数据到kafka

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐