问题背景:第三方主动推送接口,把数据推到我们的消息队列中,然后我们再消费,测试的时候挺正常,结果上线后就消费不掉了

显示找是不是哪里出错了,我们用的阿里的k8s,开始没开日志服务,每个容器只能看到最新200条的日志,最开始发现一个feign超时的错误,想是不是超时然后负载均衡调用其他service,这样我们有三个service,超时设置的1分钟,那岂不是每条最多要执行三分钟,然后我把超时时间设置成了一小时试一下,结果还不行,超时错误没了,一切看起来都没问题

然后又想到是不是feign调用报错了,然后在feign配置了抓到服务端的responseBody,再试,我草我们用的docker打包贼慢,打的我脑浆子疼

打完后发到线上,啥错误啥日志也没有,有点怀疑人生,然后又把feign调用的服务里打了log,又发了好几个包

突然有一次我看到了项目刚启动时的日志,里面有这么个错误

Auto offset commit failed for group order_group:
 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
 This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms,
 which typically implies that the poll loop is spending too much time message processing.
 You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

操了,这个错误只在程序刚运行的时候才有,报完错再也不报了,然后就百度,大概意思就是消费速度跟不上生产速度了

先是禁止自动提交和最大心跳和每次取多少条,配置完再次打包发布,还是不行

日了狗了,我寻思怎么我的错误就别人的不一样呢,咋就解决不了呢

完了看配置文件

原来我的配置文件是这样的

spring:
  application:
    name: message-service
  cloud:
    config:
      discovery:
        enabled: false
        service-id: eureka-server
      profile: ${ENV:local}
      uri: http://${CONFIGSERVERHOST:localhost}:${CONFIGSERVERPORT:9999}/config
      label: master

    stream:
      default:
        producer:
          headerMode: embeddedHeaders
        consumer:
          headerMode: embeddedHeaders
      kafka:
        binder:
          brokers: ${KAFKABROKERS:192.168.0.199:9092}
      bindings:
        trace-input:
          destination: ${TRACEDESTINATION:trace-test}
          contentType: application/json
          group: ${TRACEGROUP:trace-group-test}
        trace-output:
          destination: ${TRACEDESTINATION:trace-test}
          contentType: application/json

然后网上查的配置文件都是这样的

这一看肯定就是spring-stream-kafka的配置类不一样呗,我就直接ctrl+左键,点进bindings的这个配置,完了进源码里是这样的

这还是个map,我寻思这个他是怎么知道我配的字段跟kafka的属性对的上?

然后就这三个配置,我试了驼峰命名,又试了-连接,又试了直接冒号,都不行,本来挺简单个东西,就死活不好使

怀疑人生了,因为启动时的log很快就过了200行,根本看不到了,然后还没有日志服务,真特么难受,k8s日志插件是花钱的,而且我还不会配置,而且我们买的这个版本的kafka,不支持外网访问

没办法硬着头皮,开了日志服务,然后研究了半天把插件装上了,然后这个日志是类似elk的,log是一条一行的,但是kafka应该是直接打印+换行的,搜出来的日志,全是一半的,真是醉了,然后复杂的语法还不会用

没办法,我直接本地搭了一套kafka环境,启动发现配置根本没生效,全是默认配置

继续点进源码,连了本地就好办了,直接打断点,跟了发现上面的map里根本取不到我的配置,肯定不是在这嘎达配置的

然后试了default下的consumer添加这个配置

依旧不行,然后又是试了驼峰命名,又试了-连接,又试了直接冒号三种命名,都不行

我特么就纳闷了到底在哪配置的啊?国内各种搜,没有,国外各种搜也没有,我寻思怎么全世界就我自己用springcloud-stream-kafka吗,完了还就我自己遇见了这个错误?最可气的搜一个问题答案全是一个模子扣出来的,没一个有用的

然后特么挨个属性点进去看源码,终于在

KafkaBinderConfigurationProperties

又发现了一个

consumerProperties

次特么成了,真特么难

最后配置

spring:
  application:
    name: message-service
  cloud:
    stream:
      default:
        producer:
          headerMode: embeddedHeaders
        consumer:
          headerMode: embeddedHeaders
      kafka:
        binder:
          brokers: ${KAFKABROKERS:192.168.0.120:9092}
          consumer-properties:
            max:
              poll:
                interval:
                  ms: 600000
                records: 10

没错就是这么简单,最大心跳时间10分钟,然后每次取10条数据

要保证处理一条数据的最大时间乘以每次取出的数据量的时间小于最大心跳时间,我这边设置的feign是55秒超时,这才好用了

妈的整完了都凌晨一点半了,这么个破问题整了七个小时,中途阿里控制台还各种报错,k8s页面不开翻墙还进不去,kafka控制台开了翻墙进不去,还被阿里的破控制台整个半死,七小时一大半的时间都在打包和跟阿里控制台较劲

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐