原文链接:https://docs.spring.io/spring-kafka/reference/htmlsingle/#transactions

事务Transactions

       Kafka0.11.0.0版本客户端提供了事务支持。Spring for Apache Kafka通过如下几种方式提供事务支持:

  • KafkaTransactionManager-和普通的Spring事务支持一起使用(@Transactional,TransactionTemplate等等)。
  • 事务性的KafkaMessageListenerContainer
  • 通过KafkaTemplate实现本地事务。

       通过给DefaultKafkaProducerFactory提供一个事务id前缀transactionIdPrefix开启事务。开启事务后,生产者工厂缓存一些事务性的生产者(transactional producers),而不是管理一个共享的生产者Producer。当用户使用close()方法关闭一个生产者时,它并没有真正被关闭,而是被放回缓存中复用。每个生产者的transactional.id属性就是transactionIdPrefix+n,n从0开始,每个生产者自增。

Kafka事务管理器KafkaTransactionManager

       KafkaTransactionManager是Spring框架中的平台事务管理器PlatformTransactionManager接口的实现,在KafkaTransactionManager构造器中需要提供一个生产者工厂引用。如果你提供一个自定义的生产者工厂,它必须支持事务,参考ProducerFactory.transactionCapable()
你可以和Spring事务支持(@Transactional,TransactionTemplate等等)一起使用KafkaTransactionManager。如果一个事务开启了,任何事务内的KafkaTemplate操作都将使用这个事务内的生产者Producer。事务管理器将根据成功或失败来决定提交还是回滚事务。注意KafkaTemplate必须和事务管理器使用同样的生产者工厂ProducerFactory

事务性的监听器容器Transactional Listener Container

       你可以给监听器容器(listener container)提供一个KafkaTransactionManager实例,当这么配置的时候,容器在调用监听器之前会开启事务。如果监听器成功处理一条记录(或者一批记录,当使用BatchMessageListener的时候),容器将在事务管理器提交事务前,使用producer.sendOffsetsToTransaction())给事务发送偏移量(offset(s))。如果监听器抛出异常,事务会回滚,下次拉取(poll)的时候消费者仍可以消费到之前出错的记录。

事务同步Transaction Synchronization

       如果你需要用其他事务来同步Kafka事务,只需要简单地给监听器容器(listener container)配置合适的事务管理器(一个支持同步的事务管理器,比如DataSourceTransactionManager)。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐