Kafka学习二
转自:http://blog.csdn.net/desilting/article/details/22873335安装完kafka后,做了下简单测试消息生产端:[java] viewplaincopyprint?"font-family:Microsoft YaHei;">import java.util.Proper
·
转自:http://blog.csdn.net/desilting/article/details/22873335
安装完kafka后,做了下简单测试
消息生产端:
- <span style="font-family:Microsoft YaHei;">import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- /**
- * Created with IntelliJ IDEA.
- * User: pis
- * Date: 14-4-3
- * Time: 上午11:48
- * To change this template use File | Settings | File Templates.
- */
- public class ProducerTest {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.setProperty("metadata.broker.list","192.168.40.134:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- props.put("request.required.acks","1");
- ProducerConfig config = new ProducerConfig(props);
- Producer<String, String> producer = new Producer<String, String>(config);
- KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");
- try {
- producer.send(data);
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.close();
- }
- }
- </span>
- <span style="font-family:Microsoft YaHei;">import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- /**
- * Created with IntelliJ IDEA.
- * User: pis
- * Date: 14-4-3
- * Time: 下午1:19
- * To change this template use File | Settings | File Templates.
- */
- public class ConsumerTest extends Thread{
- private final ConsumerConnector consumer;
- private final String topic;
- public static void main(String[] args) {
- ConsumerTest consumerThread = new ConsumerTest("mykafka");
- consumerThread.start();
- }
- public ConsumerTest(String topic) {
- consumer =kafka.consumer.Consumer
- .createJavaConsumerConnector(createConsumerConfig());
- this.topic =topic;
- }
- private static ConsumerConfig createConsumerConfig() {
- Properties props = new Properties();
- props.put("zookeeper.connect","192.168.40.132:2181");
- props.put("group.id", "0");
- props.put("zookeeper.session.timeout.ms","10000");
- return new ConsumerConfig(props);
- }
- public void run(){
- Map<String,Integer> topickMap = new HashMap<String, Integer>();
- topickMap.put(topic, 1);
- Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap);
- KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
- ConsumerIterator<byte[],byte[]> it =stream.iterator();
- System.out.println("*********Results********");
- while(it.hasNext()){
- System.err.println("get data:" +new String(it.next().message()));
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- </span>
在pom.xml文件中添加:
- <span style="font-family:Microsoft YaHei;"> <build>
- <finalName>testKafka</finalName>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
- <configuration>
- <source>1.5</source>
- <target>1.5</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <descriptors>
- <descriptor>src/main/src.xml</descriptor>
- </descriptors>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id> <!-- this is used for inheritance merges -->
- <phase>package</phase> <!-- bind to the packaging phase -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build></span>
- <span style="font-family:Microsoft YaHei;"><?xml version="1.0" encoding="UTF-8"?>
- <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd ">
- <id>jar-with-dependencies</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <unpack>false</unpack>
- <scope>runtime</scope>
- </dependencySet>
- </dependencySets>
- <fileSets>
- <fileSet>
- <directory>/lib</directory>
- </fileSet>
- </fileSets>
- </assembly></span>
如果报如下错,可使用2.9.2版本的scala-library及scala-compiler包:
- <span style="font-family:Microsoft YaHei;">Exception in thread "main" java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp
- at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:77)
- at kafka.ConsumerTest.createConsumerConfig(ConsumerTest.java:44)
- at kafka.ConsumerTest.<init>(ConsumerTest.java:31)
- at kafka.ConsumerTest.main(ConsumerTest.java:27)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
- at java.lang.reflect.Method.invoke(Method.java:597)
- at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
- Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
- at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
- at java.security.AccessController.doPrivileged(Native Method)
- at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
- at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
- ... 9 more</span>
在项目目录下执行mvn package命令,得到依赖包testKafka-jar-with-dependencies.jar
编译:
javac -classpath testKafka-jar-with-dependencies.jar ProducerTest.java
javac -classpath testKafka-jar-with-dependencies.jar ConsumerTest.java
执行:
java -classpath .:testKafka-jar-with-dependencies.jar ProducerTest
java -classpath .:testKafka-jar-with-dependencies.jar CoumerTest
结果:
*********Results********
get message: test-kafka
get message: test-kafka
get message: test-kafka
更多推荐
已为社区贡献7条内容
所有评论(0)