目录:

  1. 使用properties文件的好处
    方便修改配置参数
  2. 流程
    1. 项目结构
    2. 生产者代码
    3. 消费者代码
    4. producer.properties
    5. PropertiesUtil.java
    6. 测试
  3. 改进:测试项目中的消费者代码中的参数配置同理可以用properties文件替代

一.详细步骤:

1.创建一个maven项目,项目结构

2.生产者代码

package kafkaTest;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import kafkaTest.util.PropertiesUtil;

public class ProducerSend {
	public static void main(String args[]) {
		
		//1.属性配置:端口、缓冲内存、最大连接数、key序列化、value序列化等等
		 /*
		 Properties props=new Properties();
		 props.put("bootstrap.servers", "localhost:9092");
		 props.put("acks", "all");
		 props.put("retries", 0); 
		 props.put("batch.size", 16384); 
		 props.put("linger.ms", 1);
		 props.put("buffer.memory", 33554432); 
		 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
		 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");*/
		
		
		//2.创建生产者对象,并建立连接,通过我们自己创建的 PropertiesUtil 工具类获得配置文件(根据传入的值,获得相对应的properties文件)
		Producer<String, String> producer = new KafkaProducer<>(PropertiesUtil.getProperties("producer"));
		
		//3.在my-topic主题下,发送消息
		for(int i = 0; i < 100; i++) {
			producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
			System.out.println("消息"+i);
		}
		
		//4.关闭
		producer.close();
		
	}
}

3.消费者代码
 

package kafkaTest;

import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerReceive {
	public static void main(String args[]) {
		
	    //1.参数配置:不是每一非得配置
	    Properties props = new Properties();
	    props.put("bootstrap.servers", "localhost:9092");
	    props.put("auto.commit.interval.ms", "1000");
	    //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
	    props.put("group.id", "test");
	    props.put("enable.auto.commit", "true");
	    props.put("session.timeout.ms", "30000");
	    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    
	    //2.创建消费者对象,并建立连接
	    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
	    
	    //3.设置从"my-topic"主题下拿取数据
	    consumer.subscribe(Arrays.asList("my-topic"));
	    
	    //4.消费数据
	    while (true) {
	    	//阻塞时间,从kafka中取出100毫秒的数据,有可能一次性去除0-n条
	        ConsumerRecords<String, String> records = consumer.poll(100);
	        //遍历
	        for (ConsumerRecord<String, String> record : records)
                //打印结果
                //System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                System.out.println("消费者消费的数据为:"+record.value()+"-"+new Date());
	    }
	}
}

4.producer.properties(键值对的形式)

bootstrap.servers=localhost:9092
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

5.PropertiesUtil.java

package kafkaTest.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertiesUtil {
	//该方法通过传入的参数来获得一个相对应名称的properties文件
	public static Properties getProperties(String name) {
		InputStream resourceAsStream=PropertiesUtil.class.getClassLoader().getResourceAsStream(name+".properties");
		Properties props=new Properties();
		try {
			props.load(resourceAsStream);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally {
			try {
				resourceAsStream.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return props;
		
	}
}

5.测试

  1. 开启zookeeper和kafka
  2. 运行消费者代码
  3. 运行生产者代码
    可以在生产者控制台看到


    可以在消费者控制台看到

注意:测试完成后记得关闭消费端的运行

 

二.改进:测试项目中的消费者代码中的参数配置同理可以用properties文件替代

在producer.properties 文件同目录下新建consumer.properties文件
consumer.properties

bootstrap.servers=localhost:9092
auto.commit.interval.ms=1000
#因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
group.id=test
enable.auto.commit=true
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

修改消费者代码
 

package kafkaTest;

import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import kafkaTest.util.PropertiesUtil;

public class ConsumerReceive {
	public static void main(String args[]) {
		
		//1.参数配置:不是每一非得配置
		/*Properties props = new Properties();
	    props.put("bootstrap.servers", "localhost:9092");
	    props.put("auto.commit.interval.ms", "1000");
	    props.put("group.id", "test");//因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
	    props.put("enable.auto.commit", "true");
	    props.put("session.timeout.ms", "30000");
	    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/
	    
	    //2.创建消费者对象,并建立连接
	    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PropertiesUtil.getProperties("consumer"));
	    
	    //3.设置从"my-topic"主题下拿取数据
	    consumer.subscribe(Arrays.asList("my-topic"));
	    
	    //4.消费数据
	    while (true) {
	    	//阻塞时间,从kafka中取出100毫秒的数据,有可能一次性去除0-n条
	        ConsumerRecords<String, String> records = consumer.poll(100);
	        //遍历
	        for (ConsumerRecord<String, String> record : records)
                    //打印结果
	            //System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                    System.out.println("消费者消费的数据为:"+record.value()+"-"+new Date());
	    }
	}
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐