Spring整合kafka
1.应用实例1.1 maven配置spring-kafka的版本为2.1.1.RELEASE<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId&g...
1.应用实例
1.1 maven配置
spring-kafka的版本为2.1.1.RELEASE
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
对应spring的版本是5.0.3.RELEASE
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<artifactId>spring-test</artifactId>
<groupId>org.springframework</groupId>
<version>5.0.3.RELEASE</version>
<scope>test</scope>
</dependency>
spring-kafka和kafak-clients版本映射关系,以及spring-integration和kafka-client的映射关系如下图
1.2 生产者实例
1.2.1 配置文件
定义一个producer.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
<context:component-scan base-package="producer" />
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="acks" value="all"/>
<entry key ="retries" value="0"/>
<entry key="batch.size" value="16384"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="myTopic"/>
</bean>
</beans>
如果需要支持事务,则需要进行配置一个transactionIdPrefix的参数。
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
<property name= "transactionIdPrefix" value="test-demo" />
</bean>
application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
<import resource="classpath:spring/producer.xml"/>
</beans>
1.2.2 定义一个ProducerClient
@Component
public class ProducerClient {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String topicName,String message){
kafkaTemplate.send(topicName,message);
}
}
1.2.3 测试
public class Excutor {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml");
System.out.printf("启动Producer");
ProducerClient producerClient = (ProducerClient) context.getBean("producerClient");
producerClient.sendMessage("test","data:20180329");
}
}
1.3 单个消费者实例
分为两种类型:单个消费实例和多个消费者实例。
1.3.1 配置文件
定义一个listener.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
<context:component-scan base-package="listener" />
<!--<context:component-scan base-package="concurrent" />-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="group.id" value="group1"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="30000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="test"/>
<property name="messageListener" ref="kafkaConsumerListener"/>
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" >
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
</beans>
如果需要配置事务,实现Consume-transform-Prodcue,则如下配置一个kafkaTransactionMaanager
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="test"/>
<property name="messageListener" ref="kafkaConsumerListener"/>
<property name="transactionManager" ref="transactionManager">
</bean>
<bean id="kafkaTransactionManager" class="org.springframework.kafka.transaction.KafkaTransactionManager">
<constructor-arg>
<ref bean="producerFactory"/>
</constructor-arg>
</bean>
如果需要配置一个正则表达式的主题,指定一个Pattern对象
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg ref="partern"/>
</bean>
<bean class="java.util.regex.Pattern" factory-method="compile">
<constructor-arg value=".*some pattern.*" />
<constructor-arg type="int" value="#{T(java.util.regex.Pattern).DOTALL | T(java.util.regex.Pattern).CASE_INSENSITIVE}" />
</bean>
1.3.2 定义一个messagListener
在上面配置文件中构建ContainerProperties时的属性messaeListener指定。
@Component
public class KafkaConsumerListener implements MessageListener<String, String> {
public void onMessage(ConsumerRecord<String, String> integerStringConsumerRecord) {
System.out.printf("offset= %d, key= %s, value= %s\n",
integerStringConsumerRecord.offset(),
integerStringConsumerRecord.key(),
integerStringConsumerRecord.value());
}
}
1.3.3 测试
public class Excutor {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml");
System.out.printf("启动listener");
while (true) {
}
}
}
此时可以向kafka发送消息(发送消息可以直接使用kafka自带的脚本进行),然后就可以看到日志了:
08:38:58.598 [messageListenerContainer-C-1] DEBUG org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer – Received: 2 records
offset= 51, key= null, value= message-1
offset= 52, key= null, value= message-2
1.4 多个消费者实例
通过定义一个ConcurrentMessageListenerContainer来替换上面的KafkaMessageListenerContainer
如上面listener.xml配置中
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" >
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
替换为如下,配置效果就是相当于部署了2个消费者。
<bean id="concurrentMessageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" >
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="2"/>
</bean>
2.实现生产者
为了执行生产者发送消息,spring-kafka提供了一个KafkaTemplate,让我们只关心 send 消息不需要再关注创建producer和关闭producer,还提供了事务的相关操作(Kafka从0.11.0.0版本以上)。
2.1发送消息
提供了如下接口,可以支持发送消息
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data)
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
LitenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
除了上面send操作还提供了如下操作,支持如下接口发送消息
<T> T execute(ProducerCallback<K, V, T> callback);
关于ProducerCallback的代码如下:
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
2.2 事务相关操作
提供了如下两个支持事务的操作:
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);
3.实现消费者
如果没有spring-kafak,我们需要自己创建消费者、还需要有个定时任务保证消费者一直运行获取消息、处理消息之后提交偏移量以及事务一致性(Kafka从0.11.0.0版本以上)等操作。spring-kafka封装了这些操作,只需要使用者关注消费消息的逻辑。一个流程图如下:
3.1 创建消费者
spring-kafka可以创建单个消费者,而且还支持创建多个消费者:
1、创建单个消费者
KafkaMessageListenerContainer 创建一个消费者,支持创建两种类型消费者:消费者群组类型和独立消费者类型。
2、创建多个消费者
ConcurrentMessageListenerContainer可以创建多个消费者,可以通过并发数属性来设置创建多少个消费者,ConcurrentMessageListenerContainer作用其实和部署多个消费者服务是一样的效果。支持创建两种类型消费者:消费者群组类型和独立消费者类。
关于ConcurrentMessageListenerContainer的并发数说明
这个并发数是指一个app实例创建多少个消费者。如果我们部署多个服务,比如10台服务,而分区个数时20,此时可以设置并发数是2,如果分区个数小于10,此时设置并发是1就可以了。
注意:消费消息时使用多线程消费消息, 这种多线程处理可以在自己实现消费消息逻辑中自己实现。
3.2 定时任务
KafkaMessageListenerContainer实现了Lifecycle接口的start方法,所以在上下文初始化时会调用这个对象的start方法。
@Override
public final void start() {
....
// 启动
doStart();
....
}
KafkaMessageListenerContainer通过doStart方法定义了处理消息逻辑,所以启动任务的入口可以看成是:
KafkaMessageListenerContainer#doStart()
3.3 处理消息模板
KafkaMessageListnerContailer#ListenerConsumer#run中实现了处理一个消息泛型逻辑:
while(true){
// 处理提交策略,可以概况为两类:根据时间和次数。
// (1)个数策略:消息个数处理超过阈值ackcOunt
// (2)时间策略:执行时间超过阈值ackTime
// 把acks队列中只转移到offesets中值,如果提交策略满足就处理offsets中值。
processCommits();
// 执行seek操作。处理offsets队列中值,依次进行seek操作,保证在提交出现问题时,不重复消费消息。
processSeeks();
// 获取消息
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
// 通过listner处理消息。根据不同listenerType调用相关接口处理消息。如果提交策略AckMode#Record模式,则立刻执行commit,否 则添加便宜量信息到ack队列。
invokeListener(records);
}
3.4 用户只关注处理消息逻辑
在使用spring-kafka,我们只需要关注处理消息逻辑,通过继承实现MessageListener来实现处理消息的逻辑,主要包含四种类型Listener:
public enum ListenerType {
/**
* Acknowledging and consumer aware.
*/
ACKNOWLEDGING_CONSUMER_AWARE,
/**
* Consumer aware.
*/
CONSUMER_AWARE,
/**
* Acknowledging.
*/
ACKNOWLEDGING,
/**
* Simple.
*/
SIMPLE
}
- SIMPLE,就是在处理消息时,不需要考虑提交偏移量和使用Consumer对象。
void onMessage(T data);
- ACKNOWLEDGING, 当需要手动提交时时,而不是自动提交或者spring-kafka自己实现提交的方式时,需要如下接口中acknowledment的acknowlegge()方法来提交偏移量
default void onMessage(T data, Acknowledgment acknowledgment) {
throw new UnsupportedOperationException("Container should never call this");
}
- CONSUMER_AWARE,类似Spring IOC 的ApplicationContextAware功能,如果我们在消费消息时,需要用到consumer对象,则需要使用这个类型。
default void onMessage(T data, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
- ACKNOWLEDGING_CONSUMER_AWARE,同时支持ACKNOWLEDGING和CONSUMER_AWARE两种类型。
default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
如何确定ListenerType类型?可以根据继承不同类型MeassgeListener可以确定ListenerType:
public final class ListenerUtils {
private ListenerUtils() {
super();
}
public static ListenerType determineListenerType(Object listener) {
Assert.notNull(listener, "Listener cannot be null");
ListenerType listenerType;
if (listener instanceof AcknowledgingConsumerAwareMessageListener
|| listener instanceof BatchAcknowledgingConsumerAwareMessageListener) {
listenerType = ListenerType.ACKNOWLEDGING_CONSUMER_AWARE;
}
else if (listener instanceof ConsumerAwareMessageListener
|| listener instanceof BatchConsumerAwareMessageListener) {
listenerType = ListenerType.CONSUMER_AWARE;
}
else if (listener instanceof AcknowledgingMessageListener
|| listener instanceof BatchAcknowledgingMessageListener) {
listenerType = ListenerType.ACKNOWLEDGING;
}
else if (listener instanceof GenericMessageListener) {
listenerType = ListenerType.SIMPLE;
}
else {
throw new IllegalArgumentException("Unsupported listener type: " + listener.getClass().getName());
}
return listenerType;
}
如何使用ListenerType?在KafkaMessageLisnerContailer中根据不同listenerType来处调用不同接口
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
@SuppressWarnings("rawtypes") Producer producer,
Iterator<ConsumerRecord<K, V>> iterator) throws Error {
try {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
}
...
}
3.5 提交策略
spring-kafka负责提交策略和自己手动提交,可以参考AckMode
public enum AckMode {
RECORD, //每处理一个消息,就提交一次
BATCH, //将上一次poll得到消息进行提交
TIME, //达到段时间间隔,就提交
COUNT, //达到次数,就提交
COUNT_TIME, //达到次数和一段时间间隔,就提交
MANUAL // 手动提交
MANUAL_IMMEDIATE //每处理一条消息就提交一次
}
具体可以分为两类:
1、spring-kafak负责提交
(1)RECORD, 每处理一个消息,就提交一次。和MANUAL_IMMEDIATE类似,只是这里是通过spring-kafka自己操作,而MsManualImmediateAck需要我们自己在代码里面调用Acknowledgment#acknowledge
(2)BATCH, 将上一次poll得到消息进行提交。和MANUAL类似,BATH是通过spring-kafka操作,而MANUAL模式是用户自己通过调用Acknowledgment#acknowledge来执行。
(3)TIME, 达到段时间间隔,就提交
(4)COUNT, 达到次数,就提交
(5)COUNT_TIME, 达到次数和时间间隔两个条件,就提交
2、用户自己手动提交,通过在代码里面调用Acknowledgment#acknowledge()进行提交。
(1)MANUAL:调用Acknowledgment#acknowledge可以先放置到offts中,到时候在processCommits中进行处理。
(2) MANUAL_IMMEDIATE 处理完消息,调用Acknowledgment#acknowledge执行立刻提交。
在这种模式下,自己写提交偏移量逻辑,spring-kafka不负责提交而且也不是kafka的自动提交模式。此时可以继承ListenerType为ACKNOWLEDGING或者ACKNOWLEDGING_CONSUMER_AWARE类型的MessageListener,然后使用如下接口参数中acknowledgment#ackknowlege()方法来提交。
default void onMessage(T data, Acknowledgment acknowledgment) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
举例如下:
public class AckDemo implements AcknowledgingMessageListener {
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
// 1.处理消息
process(data);
// 2.手动提交
acknowledgment.acknowledge();
}
}
3.6 事务性
3.6.1 KafkaTransactionManager
1、doBegine执行初始化事务和开启事务:
在doBegin中通过如下 ProducerFactoryUtils#getTransactionalResourceHolder方法创建KafkaResourceHolder(当成数据库的Connectiion),并执行初始化事务(initTransactions())和开启事务(begineTransactions()),如下:
====================================
类名:ProducerFactoryUtils
====================================
/**
* Obtain a Producer that is synchronized with the current transaction, if any.
* @param producerFactory the ConnectionFactory to obtain a Channel for
* @param <K> the key type.
* @param <V> the value type.
* @return the resource holder.
*/
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
final ProducerFactory<K, V> producerFactory) {
Assert.notNull(producerFactory, "ProducerFactory must not be null");
// 1.对于每一个线程会生成一个唯一key,然后根据key去查找resourceHolder
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(producerFactory);
if (resourceHolder == null) {
// 2.创建一个消费者
Producer<K, V> producer = producerFactory.createProducer();
// 3.开启事务
producer.beginTransaction();
resourceHolder = new KafkaResourceHolder<K, V>(producer);
bindResourceToTransaction(resourceHolder, producerFactory);
}
return resourceHolder;
}
创建生成者,并初始化事务
====================================
类名:DefaultKafkaProducerFactory
====================================
protected Producer<K, V> createTransactionalProducer() {
Producer<K, V> producer = this.cache.poll();
if (producer == null) {
Map<String, Object> configs = new HashMap<>(this.configs);
// 对于每一次生成producer时,都设置一个不同的transactionId
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
// 1.初始化事务。
producer.initTransactions();
return new CloseSafeProducer<K, V>(producer, this.cache);
}
else {
return producer;
}
}
2、doCommit提交事务
protected void doCommit(DefaultTransactionStatus status) {
@SuppressWarnings("unchecked")
KafkaTransactionObject<K, V> txObject = (KafkaTransactionObject<K, V>) status.getTransaction();
KafkaResourceHolder<K, V> resourceHolder = txObject.getResourceHolder();
// 提交事务
resourceHolder.commit();
}
通过KafkaResourceHolder的commit来提交事务。
public void commit() {
this.producer.commitTransaction();
}
3.6.2 实现提交偏移量事务操作
通过TransactionTemplate#excute来执行消费消息的逻辑
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
// 1、处理消息逻辑和发送消息偏移量
result = action.doInTransaction(status);
}
catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
// 2、提交事务
this.transactionManager.commit(status);
return result;
}
}
在上面的action.doIntransaction中在处理完成消息并调用这个操作发送偏移量
private void sendOffsetsToTransaction(Producer producer) {
handleAcks();
Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();
this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);
producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
}
3.6.3 事务提交和提交策略比较
选择事务提交时,如果此时再设置成其他的提交策略AckMode中值:比如说依赖次数或者时间,那么这些设置也是不生效。
3.7 总结
在使用spring-kafka的消费者功能时,可以考虑ContainerProperties中常用属性有:
(1)设置主题
topics:主题
topicPattern:主题模式
topicPartions:包含topic/partions/initial offesets。
(2)提交策略的相关配置
目前分为三种:处理一条就提交、时间策略、次数策略、自己手动提交策略(spring-kafka不再负责提交)
ackMode:指定提交模式
ackCount:采用次数策略提交模式,需要指定个数阈值。
ackTime: 采用时间策略提交时,需要指定时间阈值
syncCommits 同步或异步提交
(3)事务属性的配置
transactionManager 指定一个KafkaTransactionManager对象,它封装了spring 事务管理器,实现kafka的事务功能。
(4)消费消息的业务逻辑
messageListener 需要实现MessageListener接口,实现处理消息具体业务逻辑
(5)消费者信息
groupId 消息分组
clientId 消费者名字的前缀
更多推荐
所有评论(0)