Kafka学习(6)- kafka使用Java开发-2-使用properties文件
目录:使用properties文件的好处方便修改配置参数流程项目结构生产者代码消费者代码producer.propertiesPropertiesUtil.java测试改进:测试项目中的消费者代码中的参数配置同理可以用properties文件替代一.详细步骤:1.创建一个maven项目,项目结构2.生产者代码package ka...
·
目录:
- 使用properties文件的好处
方便修改配置参数 - 流程
- 项目结构
- 生产者代码
- 消费者代码
- producer.properties
- PropertiesUtil.java
- 测试
- 改进:测试项目中的消费者代码中的参数配置同理可以用properties文件替代
一.详细步骤:
1.创建一个maven项目,项目结构
2.生产者代码
package kafkaTest;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafkaTest.util.PropertiesUtil;
public class ProducerSend {
public static void main(String args[]) {
//1.属性配置:端口、缓冲内存、最大连接数、key序列化、value序列化等等
/*
Properties props=new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");*/
//2.创建生产者对象,并建立连接,通过我们自己创建的 PropertiesUtil 工具类获得配置文件(根据传入的值,获得相对应的properties文件)
Producer<String, String> producer = new KafkaProducer<>(PropertiesUtil.getProperties("producer"));
//3.在my-topic主题下,发送消息
for(int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
System.out.println("消息"+i);
}
//4.关闭
producer.close();
}
}
3.消费者代码
package kafkaTest;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerReceive {
public static void main(String args[]) {
//1.参数配置:不是每一非得配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("auto.commit.interval.ms", "1000");
//因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2.创建消费者对象,并建立连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//3.设置从"my-topic"主题下拿取数据
consumer.subscribe(Arrays.asList("my-topic"));
//4.消费数据
while (true) {
//阻塞时间,从kafka中取出100毫秒的数据,有可能一次性去除0-n条
ConsumerRecords<String, String> records = consumer.poll(100);
//遍历
for (ConsumerRecord<String, String> record : records)
//打印结果
//System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println("消费者消费的数据为:"+record.value()+"-"+new Date());
}
}
}
4.producer.properties(键值对的形式)
bootstrap.servers=localhost:9092
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
5.PropertiesUtil.java
package kafkaTest.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertiesUtil {
//该方法通过传入的参数来获得一个相对应名称的properties文件
public static Properties getProperties(String name) {
InputStream resourceAsStream=PropertiesUtil.class.getClassLoader().getResourceAsStream(name+".properties");
Properties props=new Properties();
try {
props.load(resourceAsStream);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
try {
resourceAsStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return props;
}
}
5.测试
- 开启zookeeper和kafka
- 运行消费者代码
- 运行生产者代码
可以在生产者控制台看到
可以在消费者控制台看到
注意:测试完成后记得关闭消费端的运行
二.改进:测试项目中的消费者代码中的参数配置同理可以用properties文件替代
在producer.properties 文件同目录下新建consumer.properties文件
consumer.properties
bootstrap.servers=localhost:9092
auto.commit.interval.ms=1000
#因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
group.id=test
enable.auto.commit=true
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
修改消费者代码
package kafkaTest;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import kafkaTest.util.PropertiesUtil;
public class ConsumerReceive {
public static void main(String args[]) {
//1.参数配置:不是每一非得配置
/*Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("auto.commit.interval.ms", "1000");
props.put("group.id", "test");//因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/
//2.创建消费者对象,并建立连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PropertiesUtil.getProperties("consumer"));
//3.设置从"my-topic"主题下拿取数据
consumer.subscribe(Arrays.asList("my-topic"));
//4.消费数据
while (true) {
//阻塞时间,从kafka中取出100毫秒的数据,有可能一次性去除0-n条
ConsumerRecords<String, String> records = consumer.poll(100);
//遍历
for (ConsumerRecord<String, String> record : records)
//打印结果
//System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println("消费者消费的数据为:"+record.value()+"-"+new Date());
}
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)