spring集成kafka配置
一 所需jar包<!--spring相关包--><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><...
·
一 所需jar包
<!--spring相关包-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- 添加Spring-core包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- 添加spring-context包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- 添加spring-tx包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- 添加spring-jdbc包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- 添加spring-test包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- springmvc json -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.5.1</version>
</dependency>
<!-- spring security -->
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-web</artifactId>
<version>3.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-core</artifactId>
<version>3.0.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.10</version>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-config</artifactId>
<version>3.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-taglibs</artifactId>
<version>3.0.2.RELEASE</version>
</dependency>
<!-- 添加mybatis的核心包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.2.8</version>
</dependency>
<!-- 添加mybatis与Spring整合的核心包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.2.2</version>
</dependency>
<!-- 添加mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
二 spring-mybatis.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
">
<context:property-placeholder location="classpath:jdbc.properties" />
<context:component-scan base-package="com.suninfo" />
<!-- 1. 数据源 : DriverManagerDataSource -->
<!-- 添加配置文件 到spring上下文 -->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close" lazy-init="false">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
</bean>
<!-- 2. mybatis的SqlSession的工厂: SqlSessionFactoryBean dataSource:引用数据源 MyBatis定义数据源,同意加载配置 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"></property>
<property name="mapperLocations" value="classpath*:mapper/*.xml" />
<!-- 分頁插件 -->
<property name="plugins">
<array>
<bean class="com.github.pagehelper.PageHelper">
<!-- 这里的几个配置主要演示如何使用,如果不理解,一定要去掉下面的配置 -->
<property name="properties">
<value>
dialect=mysql
<!-- 该参数默认为false -->
<!-- 设置为true时,会将RowBounds第一个参数offset当成pageNum页码使用 -->
<!-- 和startPage中的pageNum效果一样-->
offsetAsPageNum=true
<!-- 该参数默认为false -->
<!-- 设置为true时,使用RowBounds分页会进行count查询 -->
rowBoundsWithCount=true
<!-- 设置为true时,如果pageSize=0或者RowBounds.limit = 0就会查询出全部的结果 -->
<!-- (相当于没有执行分页查询,但是返回结果仍然是Page类型)-->
pageSizeZero=true
<!-- 3.3.0版本可用 - 分页参数合理化,默认false禁用 -->
<!-- 启用合理化时,如果pageNum<1会查询第一页,如果pageNum>pages会查询最后一页 -->
<!-- 禁用合理化时,如果pageNum<1或pageNum>pages会返回空数据 -->
reasonable=false
<!-- 3.5.0版本可用 - 为了支持startPage(Object params)方法 -->
<!-- 增加了一个`params`参数来配置参数映射,用于从Map或ServletRequest中取值 -->
<!-- 可以配置pageNum,pageSize,count,pageSizeZero,reasonable,orderBy,不配置映射的用默认值 -->
<!-- 不理解该含义的前提下,不要随便复制该配置 -->
params=count=countSql
<!-- 支持通过Mapper接口参数来传递分页参数 -->
supportMethodsArguments=true
<!-- always总是返回PageInfo类型,check检查返回类型是否为PageInfo,none返回Page -->
returnPageInfo=check
</value>
</property>
</bean>
</array>
</property>
</bean>
<!-- 3. mybatis自动扫描加载Sql映射文件/接口 : MapperScannerConfigurer sqlSessionFactory
basePackage:指定sql映射文件/接口所在的包(自动扫描) -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.wutonyu.dao"></property>
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>
<!-- 4. 事务管理 : DataSourceTransactionManager dataSource:引用上面定义的数据源 -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>
<!-- 5. 使用声明式事务 transaction-manager:引用上面定义的事务管理器 -->
<tx:annotation-driven transaction-manager="transactionManager" />
<!-- 事物的具体内容 -->
<tx:advice id="transactionAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="add*" propagation="REQUIRED" />
<tx:method name="append*" propagation="REQUIRED" />
<tx:method name="insert*" propagation="REQUIRED" />
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="batchSave*" propagation="REQUIRED" />
<tx:method name="batchDel*" propagation="REQUIRED" />
<tx:method name="refreshUnrecovery*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="modify*" propagation="REQUIRED" />
<tx:method name="edit*" propagation="REQUIRED" />
<tx:method name="delete*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<tx:method name="repair" propagation="REQUIRED" />
<tx:method name="delAndRepair" propagation="REQUIRED" />
<tx:method name="get*" propagation="SUPPORTS" />
<tx:method name="find*" propagation="SUPPORTS" />
<tx:method name="load*" propagation="SUPPORTS" />
<tx:method name="search*" propagation="SUPPORTS" />
<tx:method name="datagrid*" propagation="SUPPORTS" />
<tx:method name="*" propagation="SUPPORTS" />
</tx:attributes>
</tx:advice>
<aop:aspectj-autoproxy />
<bean id="operationLogAspect" class="com.wutonyu.aop.OperationLogAspect" />
<!-- 定义一个切面,在定义的切面上加入事物 -->
<!-- <aop:config>
<aop:pointcut id="transactionPointcut"
expression="execution(public * com.wutongyu.serviceimpl.*Impl.*(..))" />
<aop:advisor pointcut-ref="transactionPointcut"
advice-ref="transactionAdvice" />
</aop:config> -->
</beans>
三 jdbc.properties配置
jdbc.driverClassName=com.mysql.jdbc.Driver
jdbc.username=admin
jdbc.password=admin
jdbc.url=jdbc:mysql://localhost:3306/admin_commondb?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
四 spring-mvc.xml 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
<!-- 注解扫描包 -->
<context:component-scan base-package="com.wutongyu.controller" />
<!--
配置静态资源,直接映射到对应的文件夹,不被DispatcherServlet处理,3.04新增功能,需要重新设置spring-mvc-3.0.xsd
-->
<mvc:resources mapping="/static/**" location="/static/" cache-period="3153600"/>
<mvc:resources mapping="/*.html" location="/" cache-period="3153600"/>
<mvc:resources mapping="/birt/**" location="/birt/" cache-period="3153600"/>
<mvc:resources mapping="/birt/html/*.html" location="/birt/html/" cache-period="3153600"/>
<!-- 上传文件 -->
<bean id="multipartResolver" class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
<property name="defaultEncoding" value="utf-8"/>
<!-- 最大内存大小 -->
<property name="maxInMemorySize" value="102400000"/>
<!-- 最大文件大小,-1为不限制大小 -->
<property name="maxUploadSize" value="-1"/>
</bean>
<!-- 定义跳转的文件的前后缀 ,视图模式配置-->
<bean id="viewResolver"
class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<!-- 这里的配置我的理解是自动给后面action的方法return的字符串加上前缀和后缀,变成一个 可用的url地址 -->
<property name="prefix" value="/" />
<property name="suffix" value=".jsp" />
</bean>
<bean class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping"/>
<!--字节数组 -->
<bean id="byteArrayConverter" class="org.springframework.http.converter.ByteArrayHttpMessageConverter"/>
<bean id="stringConverter" class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
</list>
</property>
</bean>
<!-- 输出对象转JSON支持 -->
<bean id="jsonConverter" class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter"></bean>
<!-- 定义注解驱动Controller方法处理适配器 ,注:该适配器必须声明在<mvc:annotation-driven />之前,否则不能正常处理参数类型的转换 -->
<bean class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter">
<property name="messageConverters">
<list>
<ref bean="byteArrayConverter"/>
<ref bean="stringConverter"/>
<ref bean="jsonConverter" />
</list>
</property>
</bean>
<!-- SocketIo 配置,不涉及SocketIo则注掉 -->
<bean id="initSocketIo" class="com.wutongyu.util.socketio.InitSocketIo" init-method="init"/>
<!-- 开启注解 -->
<mvc:annotation-driven />
</beans>
五 消息接收端consumer(spring-kafka-consumer.xml)配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 创建日志分队列——loginfoTopic -->
<bean id="kafkaLogInfoConsumer" class="com.wutongyu.kafka.loginfo.KafkaLogInfoConsumer"/>
<bean id="kafkaLogInfoConsumerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="loginfoTopic"/>
<property name="messageListener" ref="kafkaLogInfoConsumer"/>
</bean>
<bean id="logInfoListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="kafkaLogInfoConsumerProperties"/>
<property name="concurrency" value="20"/>
</bean>
<!-- 创建事件分队列——eventTopic -->
<bean id="kafkaAnalysisLogInfoConsumer" class="com.wutongyu.kafka.analysisloginfo.KafkaAnalysisLogInfoConsumer"/>
<bean id="kafkaAnalysisLogInfoConsumerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="eventTopic"/>
<property name="messageListener" ref="kafkaAnalysisLogInfoConsumer"/>
</bean>
<bean id="analysisLogInfoListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="kafkaAnalysisLogInfoConsumerProperties"/>
<property name="concurrency" value="20"/>
</bean>
<!-- 创建邮件分队列——emailTopic -->
<bean id="kafkaEmailConsumer" class="com.wutongyu.kafka.email.KafkaEmailConsumer"/>
<bean id="kafkaEmailConsumerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="emailTopic"/>
<property name="messageListener" ref="kafkaEmailConsumer"/>
</bean>
<bean id="emailListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="kafkaEmailConsumerProperties"/>
<property name="concurrency" value="1"/>
</bean>
<!-- 创建短信分队列——messageTopic -->
<bean id="kafkaMessageConsumer" class="com.wutongyu.kafka.message.KafkaMessageConsumer"/>
<bean id="kafkaMessageConsumerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="messageTopic"/>
<property name="messageListener" ref="kafkaMessageConsumer"/>
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="kafkaMessageConsumerProperties"/>
<property name="concurrency" value="1"/>
</bean>
<!-- 创建客户端分队列——socketioTopic -->
<bean id="kafkaSocketioConsumer" class="com.wutongyu.kafka.socketio.KafkaSocketioConsumer"/>
<bean id="kafkaSocketioConsumerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="socketioTopic"/>
<property name="messageListener" ref="kafkaSocketioConsumer"/>
</bean>
<bean id="socketioListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="kafkaSocketioConsumerProperties"/>
<property name="concurrency" value="2"/>
</bean>
<!-- 创建微信分队列——wechatTopic -->
<bean id="kafkaWechatConsumer" class="com.wutongyu.kafka.wechat.KafkaWechatConsumer"/>
<bean id="kafkaWechatConsumerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="wechatTopic"/>
<property name="messageListener" ref="kafkaWechatConsumer"/>
</bean>
<bean id="wechatListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="kafkaWechatConsumerProperties"/>
<property name="concurrency" value="1"/>
</bean>
</beans>
六 消息发送端producer(spring-kafka-producer.xml)配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="group.id" value="0" />
<entry key="retries" value="1" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<bean id="kafkaLogInfoProducerListener" class="com.wutongyu.kafka.KafkaLogInfoProducerListener" />
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="eventKafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="eventTopic" />
<property name="producerListener" ref="kafkaLogInfoProducerListener"/>
</bean>
<bean id="emailKafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="emailTopic" />
<property name="producerListener" ref="kafkaLogInfoProducerListener"/>
</bean>
<bean id="socketioKafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="socketioTopic" />
<property name="producerListener" ref="kafkaLogInfoProducerListener"/>
</bean>
<bean id="messageKafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="messageTopic" />
<property name="producerListener" ref="kafkaLogInfoProducerListener"/>
</bean>
<bean id="wechatKafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="wechatTopic" />
<property name="producerListener" ref="kafkaLogInfoProducerListener"/>
</bean>
<bean id="kafkaAnalysisLogInfoProducer" class="com.wutongyu.kafka.analysisloginfo.KafkaAnalysisLogInfoProducer" />
<bean id="kafkaEmailProducer" class="com.suninfo.kafka.email.KafkaEmailProducer" />
<bean id="kafkaSocketioProducer" class="com.suninfo.kafka.socketio.KafkaSocketioProducer" />
<bean id="kafkaMessageProducer" class="com.suninfo.kafka.message.KafkaMessageProducer" />
<bean id="kafkaWechatProducer" class="com.suninfo.kafka.wechat.KafkaWechatProducer" />
</beans>
七 spring整合consumer和producer(spring-kafka-context.xml)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"
default-lazy-init="true">
<!-- 引入属性文件 -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
<value>classpath:rabbitmq.properties</value>
</list>
</property>
</bean>
<import resource="spring-mybatis.xml" />
<!--<import resource="spring-memcached.xml" />-->
<import resource="spring-kafka-producer.xml" />
<import resource="spring-kafka-consumer.xml" />
</beans>
八 producer生产监听类 KafkaLogInfoProducerListener 示例
package com.wutongyu.kafka;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
public class KafkaLogInfoProducerListener implements ProducerListener {
private static Logger log = LoggerFactory.getLogger( KafkaLogInfoProducerListener.class );
/**
* 发送消息成功后调用
*/
@Override
public void onSuccess(String topic, Integer partition, Object key,
Object value, RecordMetadata recordMetadata) {
//log.info("=======================kafka发送数据成功(日志)=======================");
/*log.info("----------topic:"+topic);
log.info("----------partition:"+partition);
log.info("----------key:"+key);
log.info("----------value:"+value);
log.info("----------RecordMetadata:"+recordMetadata);
log.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");*/
}
/**
* 发送消息错误后调用
*/
@Override
public void onError(String topic, Integer partition, Object key,
Object value, Exception exception) {
log.info("==========kafka发送数据错误(日志开始)==========");
log.info("----------topic:"+topic);
log.info("----------partition:"+partition);
log.info("----------key:"+key);
log.info("----------value:"+value);
log.info("----------Exception:"+exception);
log.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
exception.printStackTrace();
}
/**
* 方法返回值代表是否启动kafkaProducer监听器
*/
@Override
public boolean isInterestedInSuccess() {
//log.info("///kafkaProducer监听器启动///");
return true;
}
}
九 consumer消费类 KafkaLogInfoConsumer 示例
package com.wutongyu.kafka.loginfo;
import org.springframework.kafka.listener.MessageListener;
public class KafkaLogInfoConsumer implements MessageListener<String, String> {
private static Logger log = LoggerFactory.getLogger(KafkaLogInfoConsumer.class);
/**
* 监听器自动执行该方法
* 消费消息
* 自动提交offset
* 执行业务代码
* (high level api 不提供offset管理,不能指定offset进行消费)
*/
@Override
public void onMessage(ConsumerRecord<String, String> record) {
String json = record.value();
Map<String, Object> jsonObject = (Map) JSONObject.parse(json);
}
}
十 通过配置文件启动
public class ElasticsearchTaskMain {
private static final Logger log = LoggerFactory.getLogger( ElasticsearchTaskMain.class );
private static LogSourceService logSourceService;
public static void main(String[] args) {
ApplicationContext context =
new ClassPathXmlApplicationContext( "classpath:spring-kafka-log.xml" );
//初始化,封装消费点需要的bean
logSourceService = (LogSourceService)context.getBean( "logSourceService" );
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)