深入了解 Flink 的网络协议栈( 三 )


但是,来自接收方的附加通告消息(向发送端通知 Credit)可能会产生一些额外的开销,尤其是在使用 SSL 加密信道的场景中 。此外,单个输入通道( Input channel)不能使用缓冲池中的所有 Buffer,因为存在无法共享的 Exclusive buffer 。新的流控协议也有可能无法做到立即发送尽可能多的数据(如果生成数据的速度快于接收端反馈 Credit 的速度),这时则可能增长发送数据的时间 。虽然这可能会影响作业的性能,但由于其所有优点,通常新的流量控制会表现得更好 。可能会通过增加单个通道的独占 Buffer 数量,这会增大内存开销 。然而,与先前实现相比,总体内存使用可能仍然会降低,因为底层的网络协议栈不再需要缓存大量数据,因为我们总是可以立即将其传输到 Flink(一定会有相应的 Buffer 接收数据) 。
在使用新的 Credit-based 流量控制时,可能还会注意到另一件事:由于我们在发送方和接收方之间缓冲较少的数据,反压可能会更早的到来 。然而,这是我们所期望的,因为缓存更多数据并没有真正获得任何好处 。如果要缓存更多的数据并且保留 Credit-based 流量控制,可以考虑通过增加单个输入共享 Buffer 的数量 。

深入了解 Flink 的网络协议栈

文章插图
 

深入了解 Flink 的网络协议栈

文章插图
 
注意:如果需要关闭 Credit-based 流量控制,可以将这个配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false 。但是,此参数已过时,最终将与非 Credit-based 流控制代码一起删除 。
4. 序列化与反序列化【深入了解 Flink 的网络协议栈】下图从上面的扩展了更高级别的视图,其中包含网络协议栈及其周围组件的更多详细信息,从发送算子发送记录(record)到接收算子获取它:
深入了解 Flink 的网络协议栈

文章插图
 
在生成 record 并将其传递出去之后,例如通过 Collector#collect(),它被传递给 RecordWriter,RecordWriter 会将 JAVA 对象序列化为字节序列,最终存储在 buffer 中按照上面所描述的在网络协议栈中进行处理 。RecordWriter 首先使用 SpanningRecordSerializer 将 Record 序列化为灵活的堆上字节数组 。然后,它尝试将这些字节写入目标网络 channel 的 buffer 中 。我们将在下面的章节回到这一部分 。
 
在接收方,底层网络协议栈(netty)将接收到的 buffer 写入相应的输入通道(channel) 。流任务的线程最终从这些队列中读取并尝试在 RecordReader 的帮助下通过 SpillingAdaptiveSpanningRecordDeserializer 将累积的字节反序列化为 Java 对象 。与序列化器类似,这个反序列化器还必须处理特殊情况,例如跨越多个网络 buffer 的 record,或者因为记录本身比网络缓冲区大(默认情况下为 32KB,通过taskmanager.memory.segment-size设置)或者因为序列化 record 时,目标 buffer 中已经没有足够的剩余空间保存序列化后的字节数据,在这种情况下,Flink 将使用这些字节空间并继续将其余字节写入新的网络 buffer 中 。
4.1 将网络buffer写入Netty
在上图中,Credit-based 流控制机制实际上位于“Netty Server”(和“Netty Client”)组件内部,RecordWriter 写入的 Buffer 始终以空状态(无数据)添加到 Subpartition 中,然后逐渐向其中填写序列化后的记录 。但是 Netty 在什么时候真正的获取并发送这些 Buffer 呢?显然,不能是 Buffer 中只要有数据就发送,因为跨线程(写线程与发送线程)的数据交换与同步会造成大量的额外开销,并且会造成缓存本身失去意义(如果是这样的话,不如直接将将序列化后的字节发到网络上而不必引入中间的 Buffer) 。
在 Flink 中,有三种情况可以使 Netty 服务端使用(发送)网络 Buffer:
  • 写入 Record 时 Buffer 变满,或者
  • Buffer 超时未被发送,或
  • 发送特殊消息,例如 Checkpoint barrier 。
▼ 在 Buffer 满后发送
RecordWriter 将 Record 序列化到本地的序列化缓冲区中,并将这些序列化后的字节逐渐写入位于相应 Result subpartition 队列中的一个或多个网络 Buffer中 。虽然单个 RecordWriter 可以处理多个 Subpartition,但每个 Subpartition 只会有一个 RecordWriter 向其写入数据 。另一方面,Netty 服务端线程会从多个 Result subpartition 中读取并像上面所说的那样将数据写入适当的多路复用信道 。这是一个典型的生产者 – 消费者模式,网络缓冲区位于生产者与消费者之间,如下图所示 。在(1)序列化和(2)将数据写入 Buffer 之后,RecordWriter 会相应地更新缓冲区的写入索引 。一旦 Buffer 完全填满,RecordWriter 会(3)为当前 Record 剩余的字节或者下一个 Record 从其本地缓冲池中获取新的 Buffer,并将新的 Buffer 添加到相应 Subpartition 的队列中 。这将(4)通知 Netty服务端线程有新的数据可发送(如果 Netty 还不知道有可用的数据的话4) 。每当 Netty 有能力处理这些通知时,它将(5)从队列中获取可用 Buffer 并通过适当的 TCP 通道发送它 。


推荐阅读