已经搞定了开发环境,很快需求就要下来.

 

 

话不多说,我们开始配置一下先研究部分实时部分的代码逻辑,提供将来实时计算逻辑。

[root@node1 ~]# cat kafka_output.sh 
for((i=0;i<=1000;i++));

do echo "hello world haha haha hello haha haha kafka_test-"+$i>>/usr/local/soft/flume/flume_dir/kafka.log;

done
[root@node1 ~]#

 

这是我们即将写入的东西。

然后我们执行以下。

 

看一下我们idea

 

-------------------------------------------
Time: 1558948170000 ms
-------------------------------------------
(kafka_test-+978,1)
(kafka_test-+909,1)
(kafka_test-+996,1)
(kafka_test-+975,1)
(kafka_test-+990,1)
(kafka_test-+918,1)
(kafka_test-+957,1)
(kafka_test-+960,1)
(kafka_test-+963,1)
(haha,404)
...

 

 

到这里看一下代码实现:


package voucher

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @ author: create by shouzhuang.li
  * @ date:2019/5/23
  */
object StreamintTestSaveMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Kafka2Scala2WC").setMaster("local[3]") //local[3]指的是在本地运行,启动3个进程
    val ssc = new StreamingContext(conf, Seconds(5)) //每5秒钟统计一次数据
    val kafkaParams = Map[String, Object](
      /*kafka的端口号*/
      "bootstrap.servers" -> "172.16.1.117:9092,172.16.1.118:9092,172.16.1.119:9092",
      /*k-v的反序列化*/
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      /*kafka的组号*/
      "group.id" -> "kafka_wc",
      /*偏移量重置*/
      "auto.offset.reset" -> "latest",
      /*是否自动提交*/
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    /*kafka的已经创建的主题*/
    val topics = Array("realtime") //主题可以有多个“topicA”,“topicB”

    /*创建一个离散流*/
    val data = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    /*对kafka消费端取出来的数据进行初步处理,获取value值*/
    val lines = data.map(_.value())

    /*对初步处理的数据进行扁平化处理,并按照空格切分*/
    val words = lines.flatMap(_.split(" "))

    /*获取的单词作为key值,‘1’作为value值,组成(K,V)对*/
    val wordAndOne = words.map((_, 1))
    val reduced = wordAndOne.reduceByKey(_ + _)

    //下面是新增的语句,把DStream保存到MySQL数据库中
    reduced.foreachRDD(rdd => {
      //内部函数
      def func(records: Iterator[(String,Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://172.16.1.117:3306/test"
          val user = "root"
          val password = "MyNewPass4!"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, p._1.trim)
            stmt.setInt(2,p._2.toInt)
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })

    /*打印结果*/
    reduced.print()

    /*启动sparkstreaming程序*/
    ssc.start()

    /*等待程序退出*/
    ssc.awaitTermination()
  }
}

可以看到,数据库中已经存在了。

 

这个可以针对天级做一些实时的统计计算。

 

上述这个demo是个小例子,很快就可以正式做一些对于日志、mysql的实时数据同步。

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐