前阵子项目用到了kafka了,kafka和zookeeper的安装以及原理在我的另一篇博客中有提到,在这我就不讲了,直接讲如何在spring boot项目中集成kafka。
这篇我主要讲两个方法:
方法一我们使用spring原来集成kafka的一个插件spring-integration-kafka,主要讲一下spring如何把这套框架给集成进来使用。
spring boot的基本依赖包可以上前面两个博客中看,我们加一个这个集成框架的依赖包

参考:
http://www.cnblogs.com/yuanermen/p/5453339.html
http://www.cnblogs.com/yujinghui/p/5424706.html
http://blog.csdn.net/molingduzun123/article/details/51785141
1.加依赖包

<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>1.3.0.RELEASE</version>
</dependency>

2.添加spring-kafka-consumer.xml
在src/main/resources下新建文件spring-kafka-consumer.xml;
内容为:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:int="http://www.springframework.org/schema/integration"
     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
     xmlns:task="http://www.springframework.org/schema/task"
     xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
  <int:channel id="inputFromKafka">
      <int:queue/>
  </int:channel>

  <!--<int:service-activator auto-startup="true"-->
  <!--input-channel="inputFromKafka" ref="kafkaConsumerService" method="receiveMessage">-->
  <!--</int:service-activator>-->
  <!-- ʏЂ}ז·½ʽ¶¼¿ʒҠ-->
  <!-- ʹԃkafkaConsumerService4½ԊԫafkaлϢ -->
  <bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />
  <int:outbound-channel-adapter channel="inputFromKafka"
                                ref="kafkaConsumerService" method="processMessage" auto-startup="true"/>

  <int:poller default="true" id="default" fixed-rate="5"
              time-unit="MILLISECONDS" max-messages-per-poll="5">
  </int:poller>
  <int-kafka:inbound-channel-adapter
          kafka-consumer-context-ref="consumerContext" channel="inputFromKafka">
  </int-kafka:inbound-channel-adapter>
  <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
      <property name="properties">
          <props>
              <prop key="auto.offset.reset">smallest</prop>
              <prop key="socket.receive.buffer.bytes">10485760</prop>
              <!-- 10M -->
              <prop key="fetch.message.max.bytes">5242880</prop>
              <prop key="auto.commit.interval.ms">1000</prop>
          </props>
      </property>
  </bean>
  <!-- agent_log/msg_log -->
  <int-kafka:consumer-context id="consumerContext"
                              consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
                              consumer-properties="consumerProperties">
      <int-kafka:consumer-configurations>
          <int-kafka:consumer-configuration
                  group-id="log-monitor" max-messages="500">
              <int-kafka:topic id="agent-log" streams="4"/>
              <int-kafka:topic id="msg-log" streams="4"/>
              <int-kafka:topic id="task-log" streams="4"/>
          </int-kafka:consumer-configuration>
      </int-kafka:consumer-configurations>
  </int-kafka:consumer-context>
  <int-kafka:zookeeper-connect id="zookeeperConnect"
                               zk-connect="192.168.XX.XX:3181" zk-connection-timeout="6000"
                               zk-session-timeout="400" zk-sync-time="200"/>
</beans>

我们可以更改配置文件中自己的信息,主要改两个地方

 <bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />

这个对应的是你写的处理消息的方法,里面定义

@Service
public class KafkaConsumerServiceImpl{

    public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
        System.out.println(msgs);
    }

    public void process(String message) {

        System.out.println(message);

    }   
}

注意接收的格式;
还有一个地方是更改ip
3.注入文件
在application中添加注解

@Configuration
@ImportResource(locations={"classpath:spring-kafka-consumer.xml"})

方法二
直接用spring boot的默认配置方法
1.在application.properties中添加:

spring.kafka.bootstrap-servers=192.168.101.16:9092

spring.kafka.templated.default-topic=test
spring.kafka.consumer.group-id=default3

#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serizlizer=org.apache.kafka.common.serialization.StringSerializer

2.添加处理方法:

@Component
public class Receiver {

    @KafkaListener(topics="test")
    public void processMessage(String msgs){
        System.out.println("aa");
        System.out.println(msgs);
    }
}

大功告成;
spring boot还在发展中,有些插件并没有集成进去,就像第二种,试了好多次才成功,主要对版本的限制比较高
http://blog.csdn.net/aa3313322122/article/details/70225647
http://www.cnblogs.com/xiaojf/p/6613559.html
具体的限制可以看看这两篇博客,讲的很清楚,确实是这样的

本项目下载地址:http://download.csdn.net/download/u013289746/9951632

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐