支撑百万并发的“零拷贝”技术,你了解吗?(12)

  • WritableByteChannel target,
  • FileDescriptor targetFD) throws IOException {
  • assert !nd.transferToDirectlyNeedsPositionLock() ||
  • Thread.holdsLock(positionLock);
  • long n = -1;
  • int ti = -1;
  • try {
  • begin();
  • ti = threads.add();
  • if (!isOpen())
  • return -1;
  • do {
  • n = transferTo0(fd, position, icount, targetFD);
  • } while ((n == IOStatus.INTERRUPTED) && isOpen());
  • if (n == IOStatus.UNSUPPORTED_CASE) {
  • if (target instanceof SinkChannelImpl)
  • pipeSupported = false;
  • if (target instanceof FileChannelImpl)
  • fileSupported = false;
  • return IOStatus.UNSUPPORTED_CASE;
  • }
  • if (n == IOStatus.UNSUPPORTED) {
  • transferSupported = false;
  • return IOStatus.UNSUPPORTED;
  • }
  • return IOStatus.normalize(n);
  • } finally {
  • threads.remove(ti);
  • end (n > -1);
  • }
  • }
  • 本地方法(native method)transferTo0() 通过 JNI(Java Native Interface)调用底层 C 的函数 。
    这个 native 函数(Java_sun_nio_ch_FileChannelImpl_transferTo0)同样位于 JDK 源码包下的 native/sun/nio/ch/FileChannelImpl.c 源文件里面 。
    JNI 函数 Java_sun_nio_ch_FileChannelImpl_transferTo0() 基于条件编译对不同的系统进行预编译,下面是 JDK 基于 Linux 系统内核对 transferTo() 提供的调用封装 。
    1. #if defined(__linux__) || defined(__solaris__)
    2. #include <sys/sendfile.h>
    3. #elif defined(_AIX)
    4. #include <sys/socket.h>
    5. #elif defined(_ALLBSD_SOURCE)
    6. #include <sys/types.h>
    7. #include <sys/socket.h>
    8. #include <sys/uio.h>
    9. #define lseek64 lseek
    10. #define mmap64 mmap
    11. #endif
    12. JNIEXPORT jlong JNICALL
    13. Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
    14. jobject srcFDO,
    15. jlong position, jlong count,
    16. jobject dstFDO)
    17. {
    18. jint srcFD = fdval(env, srcFDO);
    19. jint dstFD = fdval(env, dstFDO);
    20. #if defined(__linux__)
    21. off64_t offset = (off64_t)position;
    22. jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
    23. return n;
    24. #elif defined(__solaris__)
    25. result = sendfilev64(dstFD, &sfv, 1, &numBytes);
    26. return result;
    27. #elif defined(__APPLE__)
    28. result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);
    29. return result;
    30. #endif
    31. }
    对 Linux、Solaris 以及 Apple 系统而言,transferTo0() 函数底层会执行 sendfile64 这个系统调用完成零拷贝操作,sendfile64() 函数的原型如下:
    1. #include <sys/sendfile.h>
    2. ssize_t sendfile64(int out_fd, int in_fd, off_t *offset, size_t count);
    下面简单介绍一下 sendfile64() 函数各个参数的含义:
    • out_fd:待写入的文件描述符 。
    • in_fd:待读取的文件描述符 。
    • offset:指定 in_fd 对应文件流的读取位置,如果为空,则默认从起始位置开始 。
    • count:指定在文件描述符 in_fd 和 out_fd 之间传输的字节数 。
    在 Linux 2.6.3 之前,out_fd 必须是一个 socket,而从 Linux 2.6.3 以后,out_fd 可以是任何文件 。
    也就是说,sendfile64() 函数不仅可以进行网络文件传输,还可以对本地文件实现零拷贝操作 。
    其它的零拷贝实现
    Netty 零拷贝
    Netty 中的零拷贝和上面提到的操作系统层面上的零拷贝不太一样, 我们所说的 Netty 零拷贝完全是基于(Java 层面)用户态的,它的更多的是偏向于数据操作优化这样的概念 。
    具体表现在以下几个方面:
    • Netty 通过 DefaultFileRegion 类对 java.nio.channels.FileChannel 的 tranferTo() 方法进行包装,在文件传输时可以将文件缓冲区的数据直接发送到目的通道(Channel) 。
    • ByteBuf 可以通过 wrap 操作把字节数组、ByteBuf、ByteBuffer 包装成一个 ByteBuf 对象, 进而避免了拷贝操作 。
    • ByteBuf 支持 Slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf,避免了内存的拷贝 。
    • Netty 提供了 CompositeByteBuf 类,它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝 。
    其中第 1 条属于操作系统层面的零拷贝操作,后面 3 条只能算用户层面的数据操作优化 。
    RocketMQ 和 Kafka 对比
    RocketMQ 选择了 mmap+write 这种零拷贝方式,适用于业务级消息这种小块文件的数据持久化和传输 。
    而 Kafka 采用的是 Sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输 。
    但是值得注意的一点是,Kafka 的索引文件使用的是 mmap+write 方式,数据文件使用的是 Sendfile 方式 。


    推荐阅读