想起来之前被问到了一个问题,如果Flink中的Task是一直不停的运行的话,那么拉取Kafka数据的Source端是不是会一直不停的拉取数据,如果消费速度不及时,内存不就很快会被撑爆了么?一开始对这个问题是一脸闷逼,后来随着对Flink使用的逐渐深入,对Flink的内部也有了一定的了解,所以本文就来了解下Flink内部的反压机制,看下反压机制是如何解决该问题的。

 

什么是反压以及反压所带来的影响?

    在流式处理系统中,如果出现下游消费的速度跟不上上游生产数据的速度,就种现象就叫做反压。出现反压时,理所应当限制上游生产者的速度,使得下游的速度跟得上上游的速度。

    反压会导致流处理作业数据延迟的增加,同时还会影响到Checkpoint。由于Flink的Checkpoint机制需要进行Barrier对齐,如果此时某个Task出现了反压,Barrier流动的速度就会变慢,导致Checkpoint整体时间变长,如果反压很严重,还有可能导致Checkpoint超时失败。这部分内容在《Flink中的状态一致性(再细说下Checkpoint)》里面有详细的说明,同时里面还分析了Flink1.11版本中新提供的Unaligned Checkpoints机制来解耦反压和 Checkpoint)。

    长期或者频繁出现反压才需要处理,如果只是由于网络波动或者正常GC出现的偶尔反压可以不必处理。

 

如何定位反压:

    可以在Web界面,从Sink到Source这样反向逐个Task排查,找到第一个出现反压的Task,一般上Task出现反压会出现如下现象:

 

    当 Web 页面切换到某个 Task 的 BackPressure 页面时,才会对这个Task触发反压检测。BackPressure界面会周期性的对Task线程栈信息采样,通过线程被阻塞在请求Buffer的频率来判断节点是否处于反压状态(反压就是因为Buffer不够用了,大白话就是内存不够用了,所以Task暂时性的阻塞住了)。默认情况下,这个频率在 0.1 以下显示为 OK,0.1 至 0.5 显示 LOW,而超过 0.5 显示为 HIGH。

    通过反压状态可以大致锁定反压可能存在的算子,但具体反压是由于当前Task自身处理速度慢还是由于下游Task处理慢导致的,需要通过metric监控进一步判断。因为反压存在两种可能性:

  1. 当前Task发送的速度跟不上它产生数据的速度。一般发生在一条输入多条输出这种情况,导致当前Task发送端申请不到足够的内存,例如flatmap或者collect多次。
  2. 当前Task处理数据的速度比较慢,比如每条数据都要进行算法调用之类的,而上游Task处理数据较快,从而导致上游发送端申请不到足够的内存。

    因为BackPressure面板监控的是发送端,所以如果我们找到出现反压的节点,那么反压根源要么是就这个节点(对应情况1),要么是它紧接着的下游节点(对应情况2)

 

    当不太好判断是当前Task还是下游Task出现反压时,需要利用Flink Metrics 定位产生反压的真正位置。可以参考如下几个指标:

            outPoolUsage:    发送端Buffer使用率

            inPoolUsage:    接收端Buffer使用率

            floatingBuffersUsage(1.9以上): 接收端Floating Buffer使用率

            exclusiveBuffersBuffersUsage(1.9以上):    接收端Exclusive Buffer使用率

    outPoolUsage代表发送端Buffer使用率,也就是ResultPartition。发送端共用一个LocalBufferPool,只有一个指标。

    inPoolUsage代表接收端Buffer使用率,也就是InputGate。接收端也共用一个LocalBufferPool,但是接收端每个Channel在初始化阶段都会分配固定数量的Buffer(Exclusive Buffer)。如果某一时刻接收端接受到的数量太多,Exclusive Buffer就会耗尽,此时就会向BufferPool申请剩余的Floating Buffer(除了Exclusive Buffer,其他的都是Floating Buffer,备用Buffer)。其中inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。

    并且一个LocalBufferPool所能够使用的最大资源是有限的,即NetworkBufferPool 不会把自己的 Buffer 全分配给一个Task,因为 TaskManager 上一般会运行了多个 Task,这个受taskmanager.network.memory.max-buffers-per-channel参数控制。

    总觉得在大内存情况下,这些参数设置了会导致内存的浪费…Task数量少,LocalBufferPool能申请的最大的Buffer数量又是固定的。所以个人觉得如果TaskManager内存比较大的话,这个参数是不是调大会好一点…这个后续有时间验证下。

 

反压分析可以参考下图:

    Flink 1.9 and above: If inPoolUsage is constantly around 100%, this is a strong indicator for exercising backpressure upstream.

    The following table summarises all combinations and their interpretation. Bear in mind, though, that backpressure may be minor or temporary (no need to look into it), on particular channels only, or caused by other JVM processes on a particular TaskManager, such as GC, synchronisation, I/O, resource shortage, instead of a specific subtask.

    分析反压的大致思路是:反压肯定是某个SubTask的处理能力跟不上导致的,所以反压的这个SubTask inPoolUsage使用率肯定是100%反压了。如果一个SubTask的outPoolUsage占用率很高,大概率表示它是被下游Task反压限速了。

 

反压如何解决:

    定位到反压的Task之后,可以根据Task中具体执行的内容来进行相应的处理。大部分情况反压是由于用户代码的执行效率问题(用户代码对一条数据的处理太慢了,例如每条数据都要调用算法,而调用算法运算耗时又很长) 或者数据倾斜引起的。如果是用户代码的执行效率引起的,可以通过增加并发度或者其他资源的方式来缓解反压。如果是数据倾斜引起的,可以对数据进行一次KeyBy之类的操作来解决。

 

从源码中了解下反压机制:

    Flink1.5之前的反压机制就不详细介绍了(1.5之前是基于TCP协议进行的反压,该机制有一个问题,如果同一个Task的不同SubTask分配到了同一个TaskManager内部,那么他们与),因为项目上使用的是最新的Flink1.11,所以这里只是探究下Flink1.11中的反压机制,也就是Credit-based流量控制机制。反压的图文描述可以见参考中的《一文彻底搞懂 Flink 网络流控与反压机制》,里面图文描述已经讲的非常详细了,这里我就不做图文的搬运工了,大概做一个说明:

  1. 接收端(InputGate)向发送端(ResultPartition)声明可用的Credit(一个可用的buffer对应一点Credit),表明它有多少个空闲的buffer可以接受数据。只有在Credit>0的情况下发送端才发送buffer;发送端每发送一个buffer,Credit也相应地减少一点。当Credit值为0时,停止向下游发送数据,下游有空闲的credit是会通知上游有credit可用,这样就可以控制上游发送给下游的速率。如果是同一个Task内部,那么它的RecordWrite会被Block,而RecordReader、Operator、RecordWrite都是属于同一个线程,所以大家都Block住了,反压也就生效了。
  2. 当发送端发送buffer的时候,它同样把当前堆积的buffer数量(backlog)告知接收端;接收端根据发送端堆积的数量来申请floatingbuffer
  3. PS:Event事件是直接处理的,无需使用Credit

接收端:

发送端:

 

 

 

总结:

    Flink不需要一个特殊的机制来处理反压, 因为Flink中的数据传输机制相当于已经提供了反压机制。Flink程序的最大吞吐量由程序中运行最慢的那个Task所决定。

   在进行反压测试时,可以使用Flink作业禁用全局chain,然后看下究竟是哪个步骤的速度最慢,命令如下:

    $FLINK_HOME/bin/flink run -e remote -p 10 -D pipeline.operator-chaining=false -D execution.checkpointing.interval=60000 -c com.***.mainClass -d  /***/***-shaded.jar

 

 

--------------------------------------------------分割线--------------------------------------------------

最近刚换了工作,又要熟悉新的东西了,会有一段时间暂时不会更新了,祝自己还有大家都好运^_^!!!

 

 

参考:

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/back_pressure.html(监控反压)

    https://flink.apache.org/2019/07/23/flink-network-stack-2.html (通过Flink Metric进行反压判断)

    https://zhuanlan.zhihu.com/p/92743373(如何分析及处理 Flink 反压?)

    https://developer.aliyun.com/article/57815 (Flink MemorySegment内存管理机制)

    https://blog.csdn.net/lvwenyuan_1/article/details/93490297(Flink旧版本和新版本的反压机制图示)

    https://zhuanlan.zhihu.com/p/149706396(Flink Network Buffer相关知识)

    https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager(Flink Improvement Proposals)

    https://www.jianshu.com/p/2779e73abcb8(一文彻底搞懂 Flink 网络流控与反压机制)

    https://blog.csdn.net/u012151684/article/details/109479588(Flink反压机制分析)

 

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐