demo 结构



pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>bj.zm</groupId>
  <artifactId>kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafka</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
     <dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.10</artifactId>
		<version>0.8.2.0</version>
	</dependency>
	
	
	
	<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>
	
	
  </dependencies>
</project>

kafkaUtil

package com.zybros;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaUtil {
	private static KafkaProducer<String, String> kp;
	private static KafkaConsumer<String, String> kc;  

	public static KafkaProducer<String, String> getProducer() {
		if (kp == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.1.78.23:9091,10.1.78.23:9092,10.1.78.23:9093");
			props.put("acks", "0");
			props.put("retries", 0);
			props.put("batch.size", 16384);
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			kp = new KafkaProducer<String, String>(props);
		}
		return kp;
	}
	
	
	public static KafkaConsumer<String, String> getConsumer() {  
        if(kc == null) {  
            Properties props = new Properties();  
            
            props.put("bootstrap.servers", "10.1.78.23:9091,10.1.78.23:9092,10.1.78.23:9093");  
            props.put("group.id", "12");  
            props.put("enable.auto.commit", "true");  
            props.put("auto.commit.interval.ms", "1000");  
            props.put("session.timeout.ms", "30000");  
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
            kc = new KafkaConsumer<String, String>(props);  
        }  
        
        return kc;  
	}
}
provider

package com.zybros;


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class MyProducer {

	public static void main(String[] s) {

		try {

			 Producer<String, String> producer = KafkaUtil.getProducer();  
		        int i = 0;  
		        while(true) {  
		            ProducerRecord<String, String> record = new ProducerRecord<String, String>("replication-test", String.valueOf(i), "this is message"+i);  
		            producer.send(record, new Callback() {  
		                public void onCompletion(RecordMetadata metadata, Exception e) {  
		                    if (e != null)  
		                        e.printStackTrace();  
		                    System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());  
		                }  
		            });  
		            i++;  
		            Thread.sleep(1000);  
		        }  

		} catch (Exception e) {
			e.printStackTrace();
		}

	}
}


consumer


package com.zybros;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class MyConsumer {
	
	
	public static void main(String[] s){
		
		 KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();  
	        consumer.subscribe(Arrays.asList("replication-test"));  
	        consumer.seekToBeginning(new ArrayList<TopicPartition>());
	        
	        while(true) {  
	            ConsumerRecords<String, String> records = consumer.poll(1000);  
	            for(ConsumerRecord<String, String> record : records) {  
	                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());  
	            }  
	            //按分区读取数据
//	            for (TopicPartition partition : records.partitions()) {
//	                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
//	                for (ConsumerRecord<String, String> record : partitionRecords) {
//	                    System.out.println(record.offset() + ": " + record.value());
//	                }
//	            }
	            
	        }  
		
	}

}

测试结果

运行 provider


运行 3个consumer

group.id = 6



group.id = 6



group.id = 7




可以看到前两个consumer的message是不重复的,因为前两个的group.id=6都 一样。

第3个consumer收到的信息是前两个的和,因为它的group.id=7,与前两个不一样。

这是因为:一个message 只能被一个consumer group消费一次,也就是说一个group下的consumer消费的消息是不同的。


消息模型可以分为两种, 队列发布-订阅式。 队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了queue模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。


demo 下载地址

http://download.csdn.net/detail/stonexmx/9613892


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐