java spark es_新手入门Spark直奔死亡的路上,java版本kafka spark ES 整个流程
1、创建myeclipse的 maven项目2、hadoop2.6.3 spark2.10.0ES 5.4.3 jdk1.83、创建maven依赖org.apache.sparkspark-core_2.10${spark-version}org.slf4jslf4j-log4j12log4jlog4jlog4jlog4jorg.apache.sparkspark-streaming_2.10$.
1、创建myeclipse的 maven项目
2、hadoop2.6.3 spark2.10.0 ES 5.4.3 jdk1.8
3、创建maven依赖
org.apache.spark
spark-core_2.10
${spark-version}
org.slf4j
slf4j-log4j12
log4j
log4j
log4j
log4j
org.apache.spark
spark-streaming_2.10
${spark-version}
provided
org.slf4j
slf4j-log4j12
log4j
log4j
org.apache.spark
spark-streaming-kafka_2.10
${spark-version}
org.apache.zookeeper
zookeeper
3.4.5
org.slf4j
slf4j-log4j12
log4j
log4j
org.scala-lang
scala-library
2.10.5
org.scala-lang
scala-compiler
2.10.5
org.scala-lang
scala-reflect
2.10.5
org.elasticsearch
elasticsearch-spark-13_2.10
${es-version}
org.apache.logging.log4j
log4j-to-slf4j
2.8.2
org.slf4j
slf4j-log4j12
1.7.24
test
org.slf4j
slf4j-api
1.7.24
4、直接上手java 代码
public class SparkToES implements Serializable{
public static void main(String[] args) {
//设置匹配模式,以空格分隔
final Pattern SPACE = Pattern.compile(" ");
//接收数据的地址和端口
String zkQuorum = "t-kafka1.hdp.mobike.cn:2181";
//话题所在的组
String group = "1";
//话题名称以“,”分隔
String topics = "api_newlog";
//每个话题的分片数
int numThreads = 1;
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
sparkConf.set("es.index.auto.create", "true");
sparkConf.set("es.nodes", "10.3.3.83");
//---->如果是连接的远程es节点,该项必须要设置
sparkConf.set("es.port", "9200");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(4000));
jssc.checkpoint("hdfs://mycluster/tmp/"+yyyyerr+"/"+mmerr+"/"+dderr+"/"+hherr+"/"); //设置检查点
//存放话题跟分片的映射关系
Map topicmap = new HashMap();
String[] topicsArr = topics.split(",");
int n = topicsArr.length;
for(int i=0;i
topicmap.put(topicsArr[i], numThreads);
}
//从Kafka中获取数据转换成RDD
JavaPairReceiverInputDStream lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
//从话题中过滤所需数据
JavaDStream nn = lines.map(new Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
JavaDStream a=nn.flatMap(new FlatMapFunction() {
public Iterable call(String arg0) throws Exception {
// TODO Auto-generated method stub
Map map =new HashMap();
map.put("comments", arg0);
Date date=new Date();
SimpleDateFormat dateFormat=new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
String datef=dateFormat.format(date);
map.put("commentDate", datef);
return Arrays.asList(map);
}
});
a.foreachRDD(new VoidFunction>() {
public void call(JavaRDD t) throws Exception {
JavaEsSpark.saveToEs(t, "logs/docs");
}
});
a.print();
jssc.start();
jssc.awaitTermination();
}
}
更多推荐
所有评论(0)