如何分析及处理反压?

midoll 824 2023-02-06

概念

反压(backpressure)是流式计算中十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。

  • ① 节点有性能瓶颈可能是该节点所在的机器有故障(网络、磁盘等)、机器的网络延迟和磁盘不足、频繁GC、数据热点等原因。
  • ② 大多数消息中间件,例如kafka的consumer从broker把数据pull到本地,而producer把数据push到broker。

反压的影响

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。

反压会影响checkpoint
checkpoint时长:checkpoint barrier跟随普通数据流动,如果数据处理被阻塞,使得checkpoint barrier流经整个数据管道的时长变长,导致checkpoint 总体时间变长。
state大小:为保证Exactly-Once准确一次,对于有两个以上输入管道的 Operator,checkpoint barrier需要对齐,即接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达。这些被缓存的数据会被放到state 里面,导致checkpoint变大。
checkpoint是保证准确一次的关键,checkpoint时间变长有可能导致checkpoint超时失败,而state大小可能拖慢checkpoint甚至导致OOM。

网络流控的实现:动态反馈/自动反压
image-1675653827766

Consumer 需要及时给 Producer 做一个 feedback,即告知 Producer 能够承受的速率是多少。动态反馈分为两种:
负反馈:接受速率小于发送速率时发生,告知 Producer 降低发送速率
正反馈:发送速率小于接收速率时发生,告知 Producer 可以把发送速率提上来

Flink 的数据交换有3种:

  • ①同一个 Task 的数据交换,
  • ②不同 Task 同 JVM 下的数据交换,
  • ③不同 Task 且不同 TaskManager 之间的交换。

(1)同一个 Task 的数据交换
通过算子链 operator chain 串联多个算子,主要作用是避免了序列化和网络通信的开销。
image-1675653958915

算子链 operator chain 串联多个算子的条件:
① 上下游的并行度一致
② 下游节点的入度为1
③ 上下游节点共享同一个slot
④ 下游节点的 chain 策略为 ALWAYS(例如 map、flatmap、filter等默认是ALWAYS)
⑤ 上游节点的 chain 策略为 ALWAYS 或 HEAD(source默认是HEAD)
⑥ 两个节点间数据分区方式是 forward
⑦ 用户没有禁用 chain
(2)不同 Task 同 TaskManager 的数据交换
image-1675653989204

在 TaskA 中,算子输出的数据首先通过 record Writer 进行序列化,然后传递给 result Partition 。接着,数据通过 local channel 传递给 TaskB 的 Input Gate,然后传递给 record reader 进行反序列。

(3)不同 Task 且不同 TaskManager 之间的交换
image-1675654007077
与上述(2)的不同点是数据先传递给 netty ,通过 netty 把数据推送到远程端的 Task 。

1.2 Flink(before V1.5)的TCP-based反压机制

1.5 版本之前是采用 TCP 流控机制,而没有采用feedback机制。

(1)Flink1.5 版本之前的TCP-based 反压机制
image-1675654053497
发送端 Flink 有一层Network Buffer,底层用Netty通信即有一层Channel Buffer,最后Socket通信也有Buffer,同理接收端也有对应的3级 Buffer。Flink (before V1.5)实质是利用 TCP 的流控机制来实现 feedback 。

(2)TCP 利用滑动窗口实现网络流控
TCP报文段首部有16位窗口字段,当接收方收到发送方的数据后,ACK响应报文中就将自身缓冲区的剩余大小设置到放入16位窗口字段。该窗口字段值是随网络传输的情况变化的,窗口越大,网络吞吐量越高。

参考:Apache Flink 进阶教程(七):网络流控及反压剖析

例子:TCP 利用滑动窗口限制流量

步骤1:发送端将 4,5,6 发送,接收端也能接收全部数据。

image-1675654127120
步骤2:consumer 消费了 2 ,接收端的窗口会向前滑动一格,即窗口空余1格。接着向发送端发送 ACK = 7、window = 1。

image-1675654138334
步骤3:发送端将 7 发送后,接收端接收到 7 ,但是接收端的 consumer 故障不能消费数据。这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。
image-1675654151285

image-1675654162342
(3)TCP-based 反压机制的缺点

image-1675654174851

① 单个Task的反压,阻塞了整个TaskManager的socket,导致checkpoint barrier也无法传播,最终导致checkpoint时间增长甚至checkpoint超时失败。
② 反压路径太长,导致反压时间延迟。

1.3 Flink(since V1.5)的 Credit-based 反压机制

在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback 。

Credit-base 的 feedback 步骤:
① 每一次 ResultPartition 向 InputGate 发送数据的时候,都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息。(backlog 的作用是为了让消费端感知到我们生产端的情况)
② 如果下游有充足的 Buffer ,就会返还给上游 Credit (表示剩余 buffer 数量),告知发送消息(图上两个虚线是还是采用 Netty 和 Socket 进行通信)。
image-1675654212109
生产段发送backlog=1
image-1675654217283
消费端返回credit=3
image-1675654224878
当生产端用完buffer,返回credit=0
image-1675654228499
生产端也出现了数据积压

Flink Web UI 的反压监控提供了 Subtask 级别的反压监控。监控的原理是通过Thread.getStackTrace() 采集在 TaskManager 上正在运行的所有线程,收集在缓冲区请求中阻塞的线程数(意味着下游阻塞),并计算缓冲区阻塞线程数与总线程数的比值 rate。其中,rate < 0.1 为 OK,0.1 <= rate <= 0.5 为 LOW,rate > 0.5 为 HIGH。

image-1675654331499
Web UI 反压监控

以下两种场景可能导致反压:
① 该节点发送速率跟不上它的产生数据速率。该场景一般是单输入多输出的算子,例如FlatMap。定位手段是因为这是从 Source Task 到 Sink Task 的第一个出现反压的节点,所以该节点是反压的根源节点。
② 下游的节点处理数据的速率较慢,通过反压限制了该节点的发送速率。定位手段是从该节点开始继续排查下游节点。
注意事项:
① 因为Flink Web UI 反压面板是监控发送端的,所以反压的根源节点并不一定会在反压面板体现出高反压。如果某个节点是性能瓶颈并不会导致它本身出现高反压,而是导致它的上游出现高反压。总体来看,如果找到第一个出现反压的节点,则反压根源是这个节点或者是它的下游节点。
② 通过反压面板无法区分上述两种状态,需要结合 Metrics 等监控手段来定位。如果作业的节点数很多或者并行度很大,即需要采集所有 Task 的栈信息,反压面板的压力也会很大甚至不可用。

(1)回顾 Flink Credit-based 网络
image-1675654372686
Flink Credit-Based 网络

TaskManager 之间的数据传输
不同的 TaskManager 上的两个 Subtask 通常情况下,channel 数量等于分组 key 的数量或者等于算子并发度。这些 channel 会复用同一个 TaskManager 进程的 TCP 请求,并且共享接收端 Subtask 级别的 Buffer Pool。
接收端
每个 channel 在初始阶段会被分配固定数量的独享 Exclusive Buffer,用于存储接收到的数据。算子 Operator 使用后再次释放 Exclusive Buffer。说明:channel 接收端空闲的 Buffer 数量称为 Credit,Credit 会被定时同步给发送端,用于决定发送多少个 Buffer 的数据。
③** 流量较大的场景**
接收端,channel 写满 Exclusive Buffer 后,Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。发送端,一个 Subtask 所有的 Channel 会共享同一个 Buffer Pool,因此不区分 Exclusive Buffer 和 Floating Buffer。

(2)Flink Task Metrics 监控反压
Network 和 task I/O metrics 是轻量级反压监视器,用于正在持续运行的作业,其中一下几个 metrics 是最有用的反压指标。

image-1675654439988
metrics反压指标
采用 Metrics 分析反压的思路:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游

image-1675654446378
inPoolUsage和outPoolUsage反压分析表

解释:
① outPoolUsage 和 inPoolUsage 同为低表明当前 Subtask 是正常的,同为高分别表明当前 Subtask 被下游反压。
② 如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。
③ 如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高。
注意:反压有时是短暂的且影响不大,比如来自某个 channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下可以不用处理。

下表把 inPoolUsage 分为 floatingBuffersUsage 和 exclusiveBuffersUsage,并且总结上游 Task outPoolUsage 与 floatingBuffersUsage 、 exclusiveBuffersUsage 的关系,进一步的分析一个 Subtask 和其上游 Subtask 的反压情况。

image-1675654480526
outPoolUsage 与 floatingBuffersUsage 、 exclusiveBuffersUsage 的关系表

解析:
① floatingBuffersUsage 为高则表明反压正在传导至上游。
② exclusiveBuffersUsage 则表明了反压可能存在倾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,则存在倾斜。因为少数 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,当 exclusive buffer 消耗完,就会使用floating Buffer)。

上述主要通过 TaskThread 定位反压,而分析反压原因类似一个普通程序的性能瓶颈

(1)数据倾斜
通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。

(2)用户代码的执行效率
TaskManager 进行 CPU profile,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动。

(3)TaskManager 的内存以及 GC
TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC


# flink