JAVA开Kafka消费端
JAVA, Kafka, consumer
·
- POM文件pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.laodixiao</groupId>
<artifactId>connectToKafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>connectToKafka</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- junit start -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- junit end -->
<!-- kafka start -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.1</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<!-- kafka end -->
<!-- log start -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<!-- log end -->
<!-- json start -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
<!-- json end -->
<!-- mongodb start -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>1.8.4.RELEASE</version>
</dependency>
<!-- mongodb end -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>libs/</classpathPrefix>
<mainClass>com.laodixiao.connectToKafka.ConnectKafka</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}\lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
- JAVA开Kafka消费端
package com.laodixiao.connectToKafka;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConnectKafka{
private static Logger log = LoggerFactory.getLogger(ConnectKafka.class);
private final ConsumerConnector consumer;
public static String lineSeparator = System.lineSeparator();
public ConnectKafka() {
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:8993,127.0.0.1:8989,127.0.0.1:8990,127.0.0.1:8991,127.0.0.1:8992/kafka08");
props.put("group.id", "get-chiq-rec-user-history");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "4000");
props.put("auto.commit.interval.ms", "10000");
return new ConsumerConfig(props);
}
/**
*
* @param THRESHOLD
* @param filterKey
* @throws IOException
*/
public void fetchMsgFromKafka(long THRESHOLD, String filterKey) throws IOException {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("DS.Input.All.TestTopic", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = this.consumer.createMessageStreams(topicCountMap);
//Step 5-2: UserActionInfo: cid, mac, time 增量更新
//Step 5-2-1: 从kafka的DS.Input.All.TestTopic读入数据
//Step 5-2-2: 清洗数据 ./folder/filterData_n.txt
KafkaStream<byte[], byte[]> stream = consumerMap.get("DS.Input.All.TestTopic").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int txtNum = 1; //当前清洗数据产生的文件个数
String filePath = "/data/laodi.xiao/test2/userActionData/" + txtNum + ".txt";
File fileName = new File(filePath);
if(!fileName.exists()){
fileName.createNewFile();
}
OutputStreamWriter osw1 = new OutputStreamWriter(new FileOutputStream(fileName), "utf-8");
long cnt = 0L;
String keyHot = filterKey; //指定根据何种用户历史统计热榜
while (it.hasNext()) {
String userActionData = new String((byte[]) it.next().message());
try {
JSONObject json = JSONObject.parseObject(userActionData);
if(json.get("method").equals(keyHot)){
if(cnt==THRESHOLD){ //到达指定增量,则将后续数据放入新的txt文件中
cnt = 0L;
osw1.close();
if (txtNum==60000){ //当累计创建的文件数为100个时,令文件计数器复位
txtNum = 1;
}
txtNum = txtNum + 1;
filePath = "/data/laodi.xiao/test2/userActionData/"+txtNum+".txt";
fileName = new File(filePath);
if(!fileName.exists()){
fileName.createNewFile();
}
osw1 = new OutputStreamWriter(new FileOutputStream(fileName), "utf-8");
}
String oneRecordUserActionData = json.get("iid").toString() + "," + json.get("uid").toString() + "," + json.get("timestamp").toString();
log.info(oneRecordUserActionData);
osw1.write(oneRecordUserActionData+"\n");
cnt = cnt + 1;
}else{
continue;
}
} catch (Exception e) {
continue;
}
}
osw1.close();
}
/**
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException{
long cntThreshold = 50000L;
String filterKey = "PlayMedia";
if(args.length==1){
cntThreshold = Long.parseLong(args[0]);
}else if(args.length==2) {
cntThreshold = Long.parseLong(args[0]);
filterKey = args[1];
}
System.out.println("start");
new ConnectKafka().fetchMsgFromKafka(cntThreshold, filterKey);
System.out.println("end");
}
}
更多推荐
所有评论(0)