spring boot之集成kafka
spring集成kafka的两种方法,还是用插件integration比较成熟,spring boot自带的功能还是不太全,不过简单使用都OK了
前阵子项目用到了kafka了,kafka和zookeeper的安装以及原理在我的另一篇博客中有提到,在这我就不讲了,直接讲如何在spring boot项目中集成kafka。
这篇我主要讲两个方法:
方法一我们使用spring原来集成kafka的一个插件spring-integration-kafka,主要讲一下spring如何把这套框架给集成进来使用。
spring boot的基本依赖包可以上前面两个博客中看,我们加一个这个集成框架的依赖包
参考:
http://www.cnblogs.com/yuanermen/p/5453339.html
http://www.cnblogs.com/yujinghui/p/5424706.html
http://blog.csdn.net/molingduzun123/article/details/51785141
1.加依赖包
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
2.添加spring-kafka-consumer.xml
在src/main/resources下新建文件spring-kafka-consumer.xml;
内容为:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka">
<int:queue/>
</int:channel>
<!--<int:service-activator auto-startup="true"-->
<!--input-channel="inputFromKafka" ref="kafkaConsumerService" method="receiveMessage">-->
<!--</int:service-activator>-->
<!-- ʏЂ}ז·½ʽ¶¼¿ʒҠ-->
<!-- ʹԃkafkaConsumerService4½ԊԫafkaлϢ -->
<bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />
<int:outbound-channel-adapter channel="inputFromKafka"
ref="kafkaConsumerService" method="processMessage" auto-startup="true"/>
<int:poller default="true" id="default" fixed-rate="5"
time-unit="MILLISECONDS" max-messages-per-poll="5">
</int:poller>
<int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContext" channel="inputFromKafka">
</int-kafka:inbound-channel-adapter>
<bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="auto.offset.reset">smallest</prop>
<prop key="socket.receive.buffer.bytes">10485760</prop>
<!-- 10M -->
<prop key="fetch.message.max.bytes">5242880</prop>
<prop key="auto.commit.interval.ms">1000</prop>
</props>
</property>
</bean>
<!-- agent_log/msg_log -->
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="log-monitor" max-messages="500">
<int-kafka:topic id="agent-log" streams="4"/>
<int-kafka:topic id="msg-log" streams="4"/>
<int-kafka:topic id="task-log" streams="4"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="192.168.XX.XX:3181" zk-connection-timeout="6000"
zk-session-timeout="400" zk-sync-time="200"/>
</beans>
我们可以更改配置文件中自己的信息,主要改两个地方
<bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />
这个对应的是你写的处理消息的方法,里面定义
@Service
public class KafkaConsumerServiceImpl{
public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
System.out.println(msgs);
}
public void process(String message) {
System.out.println(message);
}
}
注意接收的格式;
还有一个地方是更改ip
3.注入文件
在application中添加注解
@Configuration
@ImportResource(locations={"classpath:spring-kafka-consumer.xml"})
方法二
直接用spring boot的默认配置方法
1.在application.properties中添加:
spring.kafka.bootstrap-servers=192.168.101.16:9092
spring.kafka.templated.default-topic=test
spring.kafka.consumer.group-id=default3
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serizlizer=org.apache.kafka.common.serialization.StringSerializer
2.添加处理方法:
@Component
public class Receiver {
@KafkaListener(topics="test")
public void processMessage(String msgs){
System.out.println("aa");
System.out.println(msgs);
}
}
大功告成;
spring boot还在发展中,有些插件并没有集成进去,就像第二种,试了好多次才成功,主要对版本的限制比较高
http://blog.csdn.net/aa3313322122/article/details/70225647
http://www.cnblogs.com/xiaojf/p/6613559.html
具体的限制可以看看这两篇博客,讲的很清楚,确实是这样的
本项目下载地址:http://download.csdn.net/download/u013289746/9951632
更多推荐
所有评论(0)