kafka kafka-clients 0.10.0.0 API
demo 结构package com.zybros;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import
·
demo 结构
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bj.zm</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
</project>
kafkaUtil
package com.zybros;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
public class KafkaUtil {
private static KafkaProducer<String, String> kp;
private static KafkaConsumer<String, String> kc;
public static KafkaProducer<String, String> getProducer() {
if (kp == null) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.1.78.23:9091,10.1.78.23:9092,10.1.78.23:9093");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kp = new KafkaProducer<String, String>(props);
}
return kp;
}
public static KafkaConsumer<String, String> getConsumer() {
if(kc == null) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.1.78.23:9091,10.1.78.23:9092,10.1.78.23:9093");
props.put("group.id", "12");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kc = new KafkaConsumer<String, String>(props);
}
return kc;
}
}
provider
package com.zybros;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MyProducer {
public static void main(String[] s) {
try {
Producer<String, String> producer = KafkaUtil.getProducer();
int i = 0;
while(true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("replication-test", String.valueOf(i), "this is message"+i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
}
});
i++;
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.zybros;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class MyConsumer {
public static void main(String[] s){
KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
consumer.subscribe(Arrays.asList("replication-test"));
consumer.seekToBeginning(new ArrayList<TopicPartition>());
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
}
//按分区读取数据
// for (TopicPartition partition : records.partitions()) {
// List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// for (ConsumerRecord<String, String> record : partitionRecords) {
// System.out.println(record.offset() + ": " + record.value());
// }
// }
}
}
}
测试结果
运行 provider
运行 3个consumer
group.id = 6
group.id = 6
group.id = 7
可以看到前两个consumer的message是不重复的,因为前两个的group.id=6都 一样。
第3个consumer收到的信息是前两个的和,因为它的group.id=7,与前两个不一样。
这是因为:一个message 只能被一个consumer group消费一次,也就是说一个group下的consumer消费的消息是不同的。
消息模型可以分为两种,
队列和
发布-订阅式。 队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了queue模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
demo 下载地址
http://download.csdn.net/detail/stonexmx/9613892
更多推荐
已为社区贡献2条内容
所有评论(0)