今天来看下sparkstreaming做一个简单的实时数据处理并且保存到mysql中
已经搞定了开发环境,很快需求就要下来.话不多说,我们开始配置一下先研究部分实时部分的代码逻辑,提供将来实时计算逻辑。[root@node1 ~]# cat kafka_output.shfor((i=0;i<=1000;i++));do echo "hello world haha haha hello haha haha kafka_test-"+$i>...
·
已经搞定了开发环境,很快需求就要下来.
话不多说,我们开始配置一下先研究部分实时部分的代码逻辑,提供将来实时计算逻辑。
[root@node1 ~]# cat kafka_output.sh
for((i=0;i<=1000;i++));
do echo "hello world haha haha hello haha haha kafka_test-"+$i>>/usr/local/soft/flume/flume_dir/kafka.log;
done
[root@node1 ~]#
这是我们即将写入的东西。
然后我们执行以下。
看一下我们idea
-------------------------------------------
Time: 1558948170000 ms
-------------------------------------------
(kafka_test-+978,1)
(kafka_test-+909,1)
(kafka_test-+996,1)
(kafka_test-+975,1)
(kafka_test-+990,1)
(kafka_test-+918,1)
(kafka_test-+957,1)
(kafka_test-+960,1)
(kafka_test-+963,1)
(haha,404)
...
到这里看一下代码实现:
package voucher
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @ author: create by shouzhuang.li
* @ date:2019/5/23
*/
object StreamintTestSaveMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Kafka2Scala2WC").setMaster("local[3]") //local[3]指的是在本地运行,启动3个进程
val ssc = new StreamingContext(conf, Seconds(5)) //每5秒钟统计一次数据
val kafkaParams = Map[String, Object](
/*kafka的端口号*/
"bootstrap.servers" -> "172.16.1.117:9092,172.16.1.118:9092,172.16.1.119:9092",
/*k-v的反序列化*/
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
/*kafka的组号*/
"group.id" -> "kafka_wc",
/*偏移量重置*/
"auto.offset.reset" -> "latest",
/*是否自动提交*/
"enable.auto.commit" -> (false: java.lang.Boolean)
)
/*kafka的已经创建的主题*/
val topics = Array("realtime") //主题可以有多个“topicA”,“topicB”
/*创建一个离散流*/
val data = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
/*对kafka消费端取出来的数据进行初步处理,获取value值*/
val lines = data.map(_.value())
/*对初步处理的数据进行扁平化处理,并按照空格切分*/
val words = lines.flatMap(_.split(" "))
/*获取的单词作为key值,‘1’作为value值,组成(K,V)对*/
val wordAndOne = words.map((_, 1))
val reduced = wordAndOne.reduceByKey(_ + _)
//下面是新增的语句,把DStream保存到MySQL数据库中
reduced.foreachRDD(rdd => {
//内部函数
def func(records: Iterator[(String,Int)]) {
var conn: Connection = null
var stmt: PreparedStatement = null
try {
val url = "jdbc:mysql://172.16.1.117:3306/test"
val user = "root"
val password = "MyNewPass4!" //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
conn = DriverManager.getConnection(url, user, password)
records.foreach(p => {
val sql = "insert into wordcount(word,count) values (?,?)"
stmt = conn.prepareStatement(sql);
stmt.setString(1, p._1.trim)
stmt.setInt(2,p._2.toInt)
stmt.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) {
stmt.close()
}
if (conn != null) {
conn.close()
}
}
}
val repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(func)
})
/*打印结果*/
reduced.print()
/*启动sparkstreaming程序*/
ssc.start()
/*等待程序退出*/
ssc.awaitTermination()
}
}
可以看到,数据库中已经存在了。
这个可以针对天级做一些实时的统计计算。
上述这个demo是个小例子,很快就可以正式做一些对于日志、mysql的实时数据同步。
更多推荐
已为社区贡献2条内容
所有评论(0)