Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑( 四 )


但是,由于网络原因,返回的可能是这样的
响应成
功响应失败
也就是分两次发回给客户端
客户端该如何处理?
Kafka 是在响应消息的前面加上了每个响应的长度编码
40响应成功30响应失败
那这个长度会发生拆包吗?也很简单,申请一定长度的字节,比如2个字节来存长度,把这个2字节的长度满了,就是长度了 。
等到读满了2字节,就转换成 int 类型,再申请这个 int 类型长度的内存,再去接收这么多长度的字节,一直到读满为止 。
然后来看看 Kafka 的代码如何处理的,看到 poll 方法里处理 OP_READ 的方法的部分

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
最终,拆包和粘包的代码:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
size.hasRemaining,size 是一个 4 字节的 ByteBuffer
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
然后开始读4个字节的数据
int bytesRead = channel.read(size);读取完了之后,再看有没有剩余空间了,如果读满了,那么把这个4字节的数变成一个 int 值,并且继续分配这个 int 值大小的 ByteBuffer
if (!size.hasRemaining()) {size.rewind();int receiveSize = size.getInt();if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");if (maxSize != UNLIMITED && receiveSize > maxSize)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");this.buffer = ByteBuffer.allocate(receiveSize);}然后一直读取内容:
if (buffer != null) {int bytesRead = channel.read(buffer);if (bytesRead < 0)throw new EOFException();read += bytesRead;}然后再来看:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
这个 complete 方法,是判断 size 已经读满了,并且 内容也已经读满了,那么就表示读取到了一个完整的响应了 。
那么这就是完整的拆包和粘包的处理了,大概也就是20行代码,也是很精彩的 。
八、总结本次我们完整的看了 Sender 线程发送消息的完整过程,里面包括了 Kafka 如何封装 Java NIO 代码,并且合理的建立连接,绑定 OP_READ,OP_WRITE 事件,并且读取服务端的响应,代码质量还是非常高的,看起来也是赏心悦目 。
希望大家对着源码再好好看一遍,一定会有收货的 。

【Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑】


推荐阅读