使用scala开发spark streaming程序消费kafka的数据--wordcount程序
前提:搭建好了zk,kafka集群在kafka中创建一个topic –test2./kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 1 --partitions 3 --topic test2使用shell产生数据./kafka-console-producer.sh --broker-...
·
前提:搭建好了zk,kafka集群
在kafka中创建一个topic –test2
./kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 1 --partitions 3 --topic test2
使用shell产生数据
./kafka-console-producer.sh --broker-list hadoop1:9092 --topic test2
scala程序
pom.xml
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<scala.compat.version>2.10</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
WordCount.scala
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
}
def main(args: Array[String]) {
//接收命令行中的参数
val Array(zkQuorum, groupId, topics, numThreads, hdfs) = Array("hadoop1:2181", "streaming", "test2", "3", "file:///C:\\Users\\XT\\Desktop\\test")
//创建SparkConf并设置AppName
val conf = new SparkConf().setAppName("UrlCount")
//创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(2))
//设置检查点
ssc.checkpoint(hdfs)
//设置topic信息
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//从Kafka中拉取数据创建DStream
val lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
val wc = lines.flatMap(_.split(" ")).map((_, 1))
val result = wc.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
//将结果打印到控制台
result.print()
ssc.start()
ssc.awaitTermination()
}
}
更多推荐
已为社区贡献12条内容
所有评论(0)