spark streaming 同时处理两个不同kafka集群的数据
如题,总是不那么完美,要处理的数据在两个不同的kafka集群里面,日子得过,问题也得解决,我们创建两个DStream,连接两个不同的kafka集群的不同topic,然后再把这两个DStream union在一起处理,代码如下: package com.kingnet import java.util import org.apache.spar...
如题,总是不那么完美,要处理的数据在两个不同的kafka集群里面,日子得过,问题也得解决,我们创建两个DStream,连接两个不同的kafka集群的不同topic,然后再把这两个DStream union在一起处理,代码如下:
-
package com.kingnet
-
import java.util
-
import org.apache.spark.SparkConf
-
import org.apache.spark.streaming.kafka.KafkaUtils
-
import org.apache.spark.streaming.{Seconds, StreamingContext}
-
import org.joda.time.DateTime
-
import org.joda.time.format.DateTimeFormat
-
import scala.collection.JavaConversions._
-
/** *
-
*
-
*/
-
object IOSChannelNewActiveDids {
-
def createContext(params: KafkaStreamingParams) = {
-
// {"batchTime":5,"sources":[{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test1","numThreads":"1"},{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test2","numThreads":"1"}]}
-
val sparkConf = new SparkConf().setAppName("IOSChannelNewActiveDids")
-
val ssc = new StreamingContext(sparkConf, Seconds(params.getBatchTime.toInt))
-
// ssc.checkpoint(checkpointDirectory)
-
val rawdata = params.getSources.map(p => {
-
val topicMap = p.getTopics.split(",").map((_, p.getNumThreads.toInt)).toMap
-
KafkaUtils.createStream(ssc, p.getZookeeper, p.getGroup, topicMap).map(_._2)
-
}).toSeq
-
//把多个DStream union在一起处理。
-
val union_rawdata = ssc.union(rawdata)
-
union_rawdata.print()
-
ssc
-
}
-
def main(args: Array[String]) {
-
if (args.length < 1) {
-
System.err.println("Usage: com.kingnet.IOSChannelNewActiveDids {\"batchTime\":5,\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":1},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":1}]}")
-
System.exit(1)
-
}
-
val params = GsonObject.getInstance().fromJson(args(0), classOf[KafkaStreamingParams])
-
params.getSources.foreach(p => {
-
println(p.getTopics)
-
})
-
val ssc = createContext(params)
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
}
我们向args里面传递了一个json字符串作为参数,json字符串中配置了一个sources列表,里面指定了两个连接信息(我这里是测试,所以两个配置的zookerlist是相同的),然后我把这个json解析成了一个java对象:
-
package com.kingnet;
-
import java.util.List;
-
/**
-
* Created by xiaoj on 2016/7/13.
-
*/
-
public class KafkaStreamingParams {
-
private String batchTime;
-
private List<KafkaParams> sources;
-
public String getBatchTime() {
-
return batchTime;
-
}
-
public void setBatchTime(String batchTime) {
-
this.batchTime = batchTime;
-
}
-
public List<KafkaParams> getSources() {
-
return sources;
-
}
-
public void setSources(List<KafkaParams> sources) {
-
this.sources = sources;
-
}
-
@Override
-
public String toString() {
-
return "KafkaStreamingParams{" +
-
"batchTime='" + batchTime + '\'' +
-
", sources=" + sources +
-
'}';
-
}
-
class KafkaParams{
-
private String zookeeper;
-
private String group;
-
private String topics;
-
private String numThreads;
-
public String getZookeeper() {
-
return zookeeper;
-
}
-
public void setZookeeper(String zookeeper) {
-
this.zookeeper = zookeeper;
-
}
-
public String getGroup() {
-
return group;
-
}
-
public void setGroup(String group) {
-
this.group = group;
-
}
-
public String getTopics() {
-
return topics;
-
}
-
public void setTopics(String topics) {
-
this.topics = topics;
-
}
-
public String getNumThreads() {
-
return numThreads;
-
}
-
public void setNumThreads(String numThreads) {
-
this.numThreads = numThreads;
-
}
-
@Override
-
public String toString() {
-
return "KafkaParams{" +
-
"zookeeper='" + zookeeper + '\'' +
-
", group='" + group + '\'' +
-
", topics='" + topics + '\'' +
-
", numThreads='" + numThreads + '\'' +
-
'}';
-
}
-
}
-
}
好吧,我经常这么干,在scala项目中创建java类,得益于强大的IDEA开发工具。
-
package com.kingnet
-
import java.util
-
import com.google.gson.{Gson, GsonBuilder}
-
/**
-
* Created by xiaoj on 2016/5/5.
-
*/
-
object GsonObject {
-
@volatile private var instance: Gson = null
-
def getInstance(): Gson = {
-
if (instance == null) {
-
synchronized {
-
if (instance == null) {
-
instance = new GsonBuilder().create()
-
}
-
}
-
}
-
instance
-
}
-
def fromJson(s: String): Option[util.HashMap[String, Any]] = {
-
try {
-
Some(getInstance().fromJson(s,classOf[util.HashMap[String, Any]]))
-
} catch {
-
case e: Exception =>
-
e.printStackTrace()
-
None
-
}
-
}
-
def toJson(src:Any) = {
-
getInstance().toJson(src)
-
}
-
}
运行程序,传递一个json参数:{\"batchTime\":\"10\",\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":"1"},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":"1"}]}
打开两个kafka 的console producer分别往test1和test2两个topic里面写数据,然后在streaming程序控制台就会打印出接收到的消息了。
更多推荐
所有评论(0)