Spring集成Kafka中的事务
       原文链接:https://docs.spring.io/spring-kafka/reference/htmlsingle/#transactions事务Transactions&nbsp
原文链接:https://docs.spring.io/spring-kafka/reference/htmlsingle/#transactions
事务Transactions
Kafka0.11.0.0版本客户端提供了事务支持。Spring for Apache Kafka通过如下几种方式提供事务支持:
KafkaTransactionManager
-和普通的Spring事务支持一起使用(@Transactional,TransactionTemplate等等)。- 事务性的
KafkaMessageListenerContainer
。 - 通过
KafkaTemplate
实现本地事务。
通过给DefaultKafkaProducerFactory
提供一个事务id前缀transactionIdPrefix
开启事务。开启事务后,生产者工厂缓存一些事务性的生产者(transactional producers),而不是管理一个共享的生产者Producer
。当用户使用close()
方法关闭一个生产者时,它并没有真正被关闭,而是被放回缓存中复用。每个生产者的transactional.id
属性就是transactionIdPrefix
+n
,n从0开始,每个生产者自增。
Kafka事务管理器KafkaTransactionManager
KafkaTransactionManager
是Spring框架中的平台事务管理器PlatformTransactionManager
接口的实现,在KafkaTransactionManager
构造器中需要提供一个生产者工厂引用。如果你提供一个自定义的生产者工厂,它必须支持事务,参考ProducerFactory.transactionCapable()
。
你可以和Spring事务支持(@Transactional,TransactionTemplate等等)一起使用KafkaTransactionManager
。如果一个事务开启了,任何事务内的KafkaTemplate
操作都将使用这个事务内的生产者Producer
。事务管理器将根据成功或失败来决定提交还是回滚事务。注意KafkaTemplate
必须和事务管理器使用同样的生产者工厂ProducerFactory
。
事务性的监听器容器Transactional Listener Container
你可以给监听器容器(listener container)提供一个KafkaTransactionManager
实例,当这么配置的时候,容器在调用监听器之前会开启事务。如果监听器成功处理一条记录(或者一批记录,当使用BatchMessageListener
的时候),容器将在事务管理器提交事务前,使用producer.sendOffsetsToTransaction())
给事务发送偏移量(offset(s)
)。如果监听器抛出异常,事务会回滚,下次拉取(poll
)的时候消费者仍可以消费到之前出错的记录。
事务同步Transaction Synchronization
如果你需要用其他事务来同步Kafka事务,只需要简单地给监听器容器(listener container)配置合适的事务管理器(一个支持同步的事务管理器,比如DataSourceTransactionManager
)。
更多推荐
所有评论(0)