springboot2整合kafka实践
本文主要简述实践部分,不涉及kafka原理等。kafka安装http://kafka.apache.org/downloads在官网下载这个版本的kafka(其它版本没试过,可能会有与springboot版本出现冲突的问题)上传到linux服务器,然后解压到一个目录下,进入kafka的config目录修改配置文件server.properties,将其中的一个已...
本文主要简述实践部分,不涉及kafka原理等。
kafka安装
http://kafka.apache.org/downloads
在官网下载这个版本的kafka(其它版本没试过,可能会有与springboot版本出现冲突的问题)
上传到linux服务器,然后解压到一个目录下,进入kafka的config目录修改配置文件server.properties,
将其中的一个已经被注释的配置 listeners=PLAINTEXT://localhost:9092去掉注释
然后将localhost修改成linux服务器的ip地址
这里我是改成listeners=PLAINTEXT://192.168.45.128:9092
我的linux服务器是在本地VM虚拟机里的
其他的配置文件都不需要改,因为我们使用的是单机版的kafka,而且这个版本的kafka已经内置了一个zookeeper环境,所以可以直接使用。
kafka启动
需要root权限,输入指令su然后输入root的密码切换成root权限,否则启动会报错
1.启动zookeeper
cd进入kafka解压目录,输入指令启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
2.启动kafka
./bin/kafka-server-start.sh ./config/server.properties
一般没报错的话就说明启动成功了,实践到这里只出现了无root权限启动失败问题,其它问题没碰到。
springboot整合kafka
环境:
springboot 2.1.7.RELEASE
添加spring-kafka 2.2.8.RELEASE依赖
以下的实践用的是配置类的形式去配置kafka,不会用到配置文件,因为用配置类这种方式比较灵活,可以配置不同参数值的主题、生产者、消费者等。
1.创建主题配置文件
@Configuration
public class TopicConfigure {
@Bean
public KafkaAdmin admin(){
Map<String,Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic zhubo(){
return new NewTopic("zhubo",10, (short) 1);
}
}
@Configuration的作用是说明这个类是配置类
@Bean的作用是将类注入到容器
admin方法是配置kafka地址
zhubo方法是配置主题,其中 "zhubo"是主题名,10是分区数量,1是副本数量。这里我们用的是单机版,所以副本用1个就可以了,分区数量按实际所需来配置。
分区的作用是:假设主题有10个分区,那么kafka会尽可能将这些分区均匀分布在各个服务器中,那么多个消费者消费主题的时候,就可以进行轮询消费,例如消费者1从服务器1分区消费,消费者2从服务器2分区消费,减轻了单个服务器的压力。
副本的作用是:备份数据,以分区为单位进行备份,假设一个分区有n个副本,那么n肯定不能大于服务器数量,因为副本肯定是存在多个服务器中以保证数据不会丢失的,kafka一般默认每个服务器存一份。
这样我们主题就配置好了,启动springboot入口类,就创建好了一个名叫"zhubo",分区数为10,副本数为1的主题
如果还需要创建其它主题,只需要在配置类中添加多几个方法就可以了,例如:
@Bean
public NewTopic zhubo2(){
return new NewTopic("zhubo2",10, (short) 1);
}
2.创建生产者配置类
@Configuration
public class ProducerConfigure {
public ProducerFactory<Integer,String> producerFactory(){
return new DefaultKafkaProducerFactory<Integer, String>(producerConfigs());
}
private Map<String,Object> producerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Bean
public KafkaTemplate<Integer,String> kafkaTemplate(){
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
参数说明:
ACKS_CONFIG
这个参数关于高可用,涉及数据的安全。 有三个值:0,1,all
一般kafka为了高可用需要用集群的方式部署,其中有一个leader,多个follower。
下面是各个参数值的说明
0:生产者只要把消息发出去了,就认为消息发送成功了。
1:生产者要把消息发出去,leader也把消息写入本地磁盘了,才认为成功了。
all:生产者要把消息发出去,leader和fllower都把消息写入磁盘了,才认为成功了。
为了数据安全,一般都是用all,当然,只有在集群的情况下才有效,单机是无效的,一般用all+集群,可以保证高可用和数据安全。
RETRIES_CONFIG
重试次数设置。重试时producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries>0,那么这些情况下producer会尝试重试。
Producer还有个参数max.in.flight.request.per.connection,这个参数是控制producer的io线程在单个socket连接上能够发送未应答producer请求的最大数量,增加此值可以增加io线程的吞吐量,如果该参数大于1,那么设置retries就有可能造成发送消息的乱序。版本为0.11.1.0的kafka支持消息的重试不会造成消息的重复发送。
BATCH_SIZE_CONFIG
当多个消息发往同一个分区,生产者会将它们放在同一个批次,该参数指定了一个批次可以使用的内存大小,按照字节数进行计算,当批次满了会被发送,没满的情况下也有可能被发送,所以即使设置得再大也不会消息延迟,设置太大会占用内存,设置太小会频繁发送消息,增加额外开销。
LINGER_MS_CONFIG
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。批次会在满的时候或者这个参数设置的时间到了的时候发送,这个参数设置得大的话会导致消息延迟,可以设置小一点,虽然会导致微小的延迟,但是批次可以多加入一些消息一起发送,减少开销。
BUFFER_MEMORY_CONFIG
设置生产者内存缓存区的大小,缓存生产者发往服务器的消息,如果生产者发送速率大于服务器接收速率,那么会导致生产者内存空间不足,此时send方法要么阻塞,要么抛出异常,具体行为依赖于max.block.ms参数,这个参数设置的是阻塞时间,超过这个时间就会抛出异常。默认值是33554432字节,合计32M。
KEY_SERIALIZER_CLASS_CONFIG
实现了DESERIALIZER的key的序列化类
VALUE_SERIALIZER_CLASS_CONFIG
实现了DESERIALIZER的value的序列化类
如果需要多个不同参数值的生产者,也是只要新建多几个方法就可以了。
3.新建消费者配置类
@Configuration
public class ConsumerConfigure {
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
private ConsumerFactory<Integer,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
}
private Map<String,Object> consumerConfigs() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
参数说明:
GROUP_ID_CONFIG
消费者组,如果消费者通过订阅主题来实现组管理功能,或者使用基于kafka的偏移量管理策略,这个配置是必须的。
ENABLE_AUTO_COMMIT_CONFIG
如果设为true,消费者的偏移量会定期在后台提交,即定期地往zookeeper写入每个分区的offset
AUTO_COMMIT_INTERVAL_MS_CONFIG
往zookeeper上写offset的频率
KEY_DESERIALIZER_CLASS_CONFIG
实现了DESERIALIZER的key的反序列化类
VALUE_DESERIALIZER_CLASS_CONFIG
实现了DESERIALIZER的value的反序列化类
如果需要配置多个不同参数的消费者,只要添加多几个方法就可以了,例如:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory2(){
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
private ConsumerFactory<Integer,String> consumerFactory2() {
return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
}
private Map<String,Object> consumerConfigs2() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
新建消费者类
public class SimpleConsumerListener {
@KafkaListener(id = "test1",topics = "zhubo",containerFactory = "kafkaListenerContainerFactory")
public void listen1(String str){
System.out.println("listen1接收:"+str);
}
@KafkaListener(id = "test2",topics = "zhubo",containerFactory = "kafkaListenerContainerFactory2")
public void listen2(String str){
System.out.println("listen2接收:"+str);
}
}
其中的id保证其唯一性就可以了,即各个方法添加的注释的id不同,topics是主题名,containerFactory是指向消费者配置类配置的方法,指向之后消费者的参数就是使用的我们配置的参数,例如消费者配置类中的kafkaListenerContainerFactory方法我们配置的groupid是test,而kafkaListenerContainerFactory2方法我们配置的groupid是test2,那么消费者listen1就是test组的,listen2就是test2组的。
消费者类需要在消费者配置类中注入到容器,在消费者配置类中添加以下方法注入:
@Bean
public SimpleConsumerListener simpleConsumerListener(){
return new SimpleConsumerListener();
}
至于生产者类,这里我们不创建,而是在test类中直接注入template来发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class Kafkatest2ApplicationTests {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaTemplate kafkaTemplate2;
@Test
public void contextLoads() {
kafkaTemplate.send("zhubo","heihei");
}
@Test
public void contextLoads2() {
kafkaTemplate2.send("zhubo","heihei");
}
}
其中的kafkaTemplate和kafkaTemplate2参数名是对应的生产者配置类中的方法名,这样子才能正常注入
启动springboot入口类,然后运行测试类的测试方法,就可以在控制台中看到
listen1接收:heihei
listen2接收:heihei
说明消费者接收到了生产者发送的消息
更多推荐
所有评论(0)