错误分析:

报错信息如下:
The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).

官网解释如下:
flink官网解释

简而言之就是:sinkProducer的超时时间默认为1个小时,
但是kafka broker的超时时间默认是15分钟, kafka broker不允许producer的超时时间比他大.
所以有两种解决办法:
		1.生产者的超时时间调小
		2.将broker的超时时间调大

默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms 的值。

在这里插入图片描述

解决办法:

//将kafka producer的超时时间调至和broker一致即可
sinkConfig.put("transaction.timeout.ms",15*60*1000);
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐