Kafka开源项目指南
Kafka2MongoDB
Kafka2MongoDB
实现了将Kafka中的数据推送给Mongodb,然后再将Mongodb中的数据推送给Es的过程。数据来源是来自txt文档中的180万条数据。准备工作:1)在Mongdb集群上创建一个数据库mydb,并创建一个空的Collection,命名为netflows[java] viewplain copy@SuppressWarni
实现了将Kafka中的数据推送给Mongodb,然后再将Mongodb中的数据推送给Es的过程。数据来源是来自txt文档中的180万条数据。
准备工作:
1)在Mongdb集群上创建一个数据库mydb,并创建一个空的Collection,命名为netflows
- @SuppressWarnings("deprecation")
- Mongo mongo = new Mongo("10.10.16.251", 10111);
- DB db = mongo.getDB("mydb");
- //创建Collection,但是不添加数据
- db.createCollection("netflows", null);
- DBCollection dbColl = db.getCollection("netflows");
2)在kafka的集群上创建一个主题flume1
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume1
2)在es集群上创建一个索引myindex,该索引的类型是netflows
- IndexResponse res = client.prepareIndex().setIndex("myindex").setType("netflows").execute().actionGet();
下面是代码实现:
1.从文件中读取测试数据,并推动给Kafka
- package com.test;
- import java.io.BufferedReader;
- import java.io.FileNotFoundException;
- import java.io.FileReader;
- import java.io.IOException;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class Kafka_Producer {
- public static void main(String[] args) {
- /*
- * Properties props = new Properties();
- * props.setProperty("metadata.broker.list","10.10.16.253:9092");
- * props.setProperty
- * ("serializer.class","kafka.serializer.StringEncoder");
- * props.put("request.required.acks","-1");
- *
- * ProducerConfig config = new ProducerConfig(props); Producer<String,
- * String> producer = new Producer<String, String>(config);
- * KeyedMessage<String, String> data = new KeyedMessage<String,
- * String>("flume","test-kafka");
- *
- * producer.send(data);
- *
- * producer.close();
- */
- MessageSender messageSender = new MessageSender();
- FileReader fr;
- try {
- fr = new FileReader("C:\\TxtData\\NetFlowAttackDDOS\\4test.txt");
- BufferedReader br = new BufferedReader(fr);
- String line = "";
- String[] arrs = null;
- DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = null;
- long num = 0;
- while ((line = br.readLine()) != null) {
- arrs = line.split(",");
- date = format.parse(format.format(new Date()));
- Calendar cla = Calendar.getInstance();
- cla.setTime(date);
- cla.add(Calendar.HOUR_OF_DAY,8);
- messageSender.sendToKafkaTopic((num+10001) + ","+arrs[3] + " ," + arrs[4] + " ," + arrs[5]+ ","+arrs[6]);
- num ++;
- }
- br.close();
- fr.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- messageSender.close();
- }
- }
- package com.test;
- import java.util.Properties;
- import com.test.utils.ConstanUtil;
- import com.test.utils.ObjectFormatUtil;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class MessageSender {
- private long windowStart = 0;
- private long windowCurrent = 0;
- private long sendSize = 0;
- Producer<String, String> producer = null;
- private StringBuilder records = new StringBuilder();
- public MessageSender() {
- initKafkaProperties();
- }
- private void initKafkaProperties(){
- Properties props = new Properties();
- props.setProperty("metadata.broker.list","10.10.16.253:9092,10.10.16.252:9092,10.10.16.249:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- props.put("request.required.acks","1");
- props.put("zookeeper.session.timeout.ms", "400000");
- ProducerConfig config = new ProducerConfig(props);
- producer = new Producer<String, String>(config);
- }
- public void sendToKafkaTopic(String message){
- KeyedMessage<String, String> data = new KeyedMessage<String, String>("flume1",message);
- producer.send(data);
- /*long date = ObjectFormatUtil.formatDateToMinute(System.currentTimeMillis());
- windowCurrent = date;
- if(windowStart == 0){//初始化开始的时间窗口
- windowStart = date;
- }
- if ((windowCurrent - windowStart) >= (ConstanUtil.TIME_WINDOW_LEN)) {
- if(records.length() != 0){
- String sendContent = records.substring(0,records.lastIndexOf("@"));
- KeyedMessage<String, String> data = new KeyedMessage<String, String>("flume1",sendContent);
- producer.send(data);
- sendContent = null;
- records = new StringBuilder();
- sendSize = 0;
- }
- }
- records.append(message + "@");
- sendSize++;
- if ((sendSize >= 100)) {
- String sendContent = records.substring(0,records.lastIndexOf("@"));//去掉最后面的@符号
- KeyedMessage<String, String> data = new KeyedMessage<String, String>("flume1",sendContent);
- producer.send(data);
- sendContent = null;
- records = new StringBuilder();
- sendSize = 0;//1000条记录向Kafka发送一次,发送完后将记录数据包的个数清零.
- }*/
- }
- public void close(){
- try {
- if(null != producer){
- producer.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- package com.test.thread;
- import java.net.UnknownHostException;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.client.Client;
- import org.elasticsearch.client.transport.TransportClient;
- import org.elasticsearch.common.transport.InetSocketTransportAddress;
- import com.mongodb.BasicDBObject;
- import com.mongodb.DB;
- import com.mongodb.DBCollection;
- import com.mongodb.DBObject;
- import com.mongodb.Mongo;
- import com.test.model.NetFlow;
- import com.test.utils.ESUtils;
- import kafka.consumer.Consumer;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- public class ConsumerThread extends Thread {
- <span style="white-space:pre"> </span>private String topic;
- <span style="white-space:pre"> </span>private String mongodbIp;
- <span style="white-space:pre"> </span>private int mongodbPort;
- <span style="white-space:pre"> </span>private String indexName;
- <span style="white-space:pre"> </span>private String indexType;
- <span style="white-space:pre"> </span>private String dbName;
- <span style="white-space:pre"> </span>private String collName;
- <span style="white-space:pre"> </span>public ConsumerThread(String topic,String mongodbIp,int mongodbPort,String indexName,String indexType,String dbName,String collName) {
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>this.topic = topic;
- <span style="white-space:pre"> </span>this.mongodbIp = mongodbIp;
- <span style="white-space:pre"> </span>this.mongodbPort = mongodbPort;
- <span style="white-space:pre"> </span>this.indexName = indexName;
- <span style="white-space:pre"> </span>this.indexType = indexType;
- <span style="white-space:pre"> </span>this.dbName = dbName;
- <span style="white-space:pre"> </span>this.collName = collName;
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>@SuppressWarnings({ "deprecation", "resource" })
- <span style="white-space:pre"> </span>@Override
- <span style="white-space:pre"> </span>public void run() {
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>ConsumerConnector consumer = createConsumer();
- <span style="white-space:pre"> </span>Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- <span style="white-space:pre"> </span>topicCountMap.put(this.topic, 1); // 一次从主题中获取一个数据
- <span style="white-space:pre"> </span>Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
- <span style="white-space:pre"> </span>KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
- <span style="white-space:pre"> </span>ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>Mongo mongo;
- <span style="white-space:pre"> </span>@SuppressWarnings("unused")
- <span style="white-space:pre"> </span>Client client;
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>try {
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>mongo = new Mongo(this.mongodbIp, this.mongodbPort);
- <span style="white-space:pre"> </span>DB db = mongo.getDB(this.dbName);
- <span style="white-space:pre"> </span>DBCollection dbColl = db.getCollection(this.collName);
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>client = new TransportClient().addTransportAddress(new InetSocketTransportAddress("10.10.16.253", 9300));<span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>while (iterator.hasNext()) {
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>String message = new String(iterator.next().message());
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>String[] recMessage = message.split(",");
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>if(recMessage.length !=9){
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>continue;
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>else{
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>for (int j = 0;j < recMessage.length; j++) {
- <span style="white-space:pre"> </span> DBObject data4 = new BasicDBObject();
- <span style="white-space:pre"> </span> data4.put("_id", 1);
- <span style="white-space:pre"> </span> data4.put("sourceIp", recMessage[3]);
- <span style="white-space:pre"> </span> data4.put("sourcePort", recMessage[4]);
- <span style="white-space:pre"> </span> data4.put("destIp", recMessage[5]);
- <span style="white-space:pre"> </span> data4.put("destPort", recMessage[6]);
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- <span style="white-space:pre"> </span> Date date = null;
- <span style="white-space:pre"> </span> date = format.parse(format.format(new Date()));
- <span style="white-space:pre"> </span> Calendar cla = Calendar.getInstance();
- <span style="white-space:pre"> </span> cla.setTime(date);
- <span style="white-space:pre"> </span> cla.add(Calendar.HOUR_OF_DAY, 8);
- <span style="white-space:pre"> </span> data4.put("update_time", cla.getTime().getTime());<span style="white-space:pre"> </span> <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> dbColl.insert(data4);
- <span style="white-space:pre"> </span> <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> GetResponse getResponse = client.prepareGet().setIndex(this.indexName).setType(this.indexType).setId(recMessage[0]).execute().actionGet();
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> String searchResult = getResponse.getSourceAsString();
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> if(searchResult==null){
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> //向es中存入
- <span style="white-space:pre"> </span> <span style="white-space:pre"> </span>NetFlow netFlow = new NetFlow();
- <span style="white-space:pre"> </span>netFlow.setSourceIp(recMessage[1]);
- <span style="white-space:pre"> </span>netFlow.setSourcePort(Integer.parseInt(recMessage[2].trim()));<span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>netFlow.setDestIp(recMessage[3]);<span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>netFlow.setDestPort(Integer.parseInt(recMessage[4].trim()));<span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>String jsondata = ESUtils.toJson(netFlow);
- <span style="white-space:pre"> </span> <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>IndexResponse indexResponse = client.prepareIndex().setIndex(this.indexName).setType(this.indexType).setId(recMessage[0]).setSource(jsondata).execute().actionGet();
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> }
- <span style="white-space:pre"> </span> if(searchResult!=null){
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span> //什么操作都不做
- <span style="white-space:pre"> </span> }
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>}<span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>} catch (UnknownHostException e) {
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>e.printStackTrace();<span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>consumer.shutdown();
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>} catch (ParseException e) {
- <span style="white-space:pre"> </span>e.printStackTrace();
- <span style="white-space:pre"> </span>consumer.shutdown();
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>private ConsumerConnector createConsumer() {
- <span style="white-space:pre"> </span>
- <span style="white-space:pre"> </span>Properties properties = new Properties();
- <span style="white-space:pre"> </span>properties.put("zookeeper.connect","10.10.16.252:2181,10.10.16.253:2181,10.10.16.249:2181");// 声明zk
- <span style="white-space:pre"> </span>properties.put("group.id", "test-consummer-group");
- <span style="white-space:pre"> </span>properties.put("serializer.class", "kafka.serializer.StringEncoder");
- <span style="white-space:pre"> </span>return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
- <span style="white-space:pre"> </span>}
- <span style="white-space:pre"> </span>public static void main(String[] args) {
- <span style="white-space:pre"> </span>new ConsumerThread("flume1","10.10.16.251",10111,"myindex","netflows","mydb","netflows").start();// 使用kafka集群中创建好的主题 test
- <span style="white-space:pre"> </span>}
- }
- package com.test.model;
- public class NetFlow {
- private String sourceIp;
- private int sourcePort;
- private String destIp;
- private int destPort;
- public String getSourceIp() {
- return sourceIp;
- }
- public void setSourceIp(String sourceIp) {
- this.sourceIp = sourceIp;
- }
- public int getSourcePort() {
- return sourcePort;
- }
- public void setSourcePort(int sourcePort) {
- this.sourcePort = sourcePort;
- }
- public String getDestIp() {
- return destIp;
- }
- public void setDestIp(String destIp) {
- this.destIp = destIp;
- }
- public int getDestPort() {
- return destPort;
- }
- public void setDestPort(int destPort) {
- this.destPort = destPort;
- }
- }
- package com.test.utils;
- import java.io.IOException;
- import org.codehaus.jackson.JsonProcessingException;
- import org.codehaus.jackson.map.ObjectMapper;
- /**
- * 使用jackson定义了一个将对象转化成json的工具类
- */
- public class ESUtils {
- private static ObjectMapper objectMapper = new ObjectMapper();
- public static String toJson(Object o){
- try {
- return objectMapper.writeValueAsString(o);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return "";
- }
- }
2486
0
0
- 0
扫一扫分享内容
分享
回到
顶部
顶部
所有评论(0)