1、创建myeclipse的 maven项目

2、hadoop2.6.3 spark2.10.0 ES 5.4.3 jdk1.8

3、创建maven依赖

org.apache.spark

spark-core_2.10

${spark-version}

org.slf4j

slf4j-log4j12

log4j

log4j

log4j

log4j

org.apache.spark

spark-streaming_2.10

${spark-version}

provided

org.slf4j

slf4j-log4j12

log4j

log4j

org.apache.spark

spark-streaming-kafka_2.10

${spark-version}

org.apache.zookeeper

zookeeper

3.4.5

org.slf4j

slf4j-log4j12

log4j

log4j

org.scala-lang

scala-library

2.10.5

org.scala-lang

scala-compiler

2.10.5

org.scala-lang

scala-reflect

2.10.5

org.elasticsearch

elasticsearch-spark-13_2.10

${es-version}

org.apache.logging.log4j

log4j-to-slf4j

2.8.2

org.slf4j

slf4j-log4j12

1.7.24

test

org.slf4j

slf4j-api

1.7.24

4、直接上手java 代码

public class SparkToES  implements Serializable{

public static void main(String[] args) {

//设置匹配模式,以空格分隔

final Pattern SPACE = Pattern.compile(" ");

//接收数据的地址和端口

String zkQuorum = "t-kafka1.hdp.mobike.cn:2181";

//话题所在的组

String group = "1";

//话题名称以“,”分隔

String topics = "api_newlog";

//每个话题的分片数

int numThreads = 1;

SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");

sparkConf.set("es.index.auto.create", "true");

sparkConf.set("es.nodes", "10.3.3.83");

//---->如果是连接的远程es节点,该项必须要设置

sparkConf.set("es.port", "9200");

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(4000));

jssc.checkpoint("hdfs://mycluster/tmp/"+yyyyerr+"/"+mmerr+"/"+dderr+"/"+hherr+"/"); //设置检查点

//存放话题跟分片的映射关系

Map topicmap = new HashMap();

String[] topicsArr = topics.split(",");

int n = topicsArr.length;

for(int i=0;i

topicmap.put(topicsArr[i], numThreads);

}

//从Kafka中获取数据转换成RDD

JavaPairReceiverInputDStream lines = KafkaUtils.createStream(jssc,                zkQuorum, group, topicmap);

//从话题中过滤所需数据

JavaDStream nn = lines.map(new Function, String>() {

public String call(Tuple2 tuple2) {

return tuple2._2();

}

});

JavaDStream a=nn.flatMap(new FlatMapFunction() {

public Iterable call(String arg0) throws Exception {

// TODO Auto-generated method stub

Map map =new HashMap();

map.put("comments", arg0);

Date date=new  Date();

SimpleDateFormat dateFormat=new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");

String datef=dateFormat.format(date);

map.put("commentDate",  datef);

return  Arrays.asList(map);

}

});

a.foreachRDD(new VoidFunction>() {

public void call(JavaRDD t) throws Exception {

JavaEsSpark.saveToEs(t, "logs/docs");

}

});

a.print();

jssc.start();

jssc.awaitTermination();

}

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐