探索 Rust 异步简化编程( 二 )


使用能保证完成的future后,同学A的聊天服务器如下:
async fn handle_connection(socket: TcpStream, channel: Channel) { let reader = Arc::new(socket); let writer = reader.clone; let read_task = task::spawn(async move { while let Some(line_in) in parse_line(&reader).await { broadcast_line(line_in); } }); loop { // `channel` and JoinHandle are both "channel-like" types. select! { res = read_task.join => { // The connection closed, exit loop break; } line_out = channel.recv => { write_line(&writer, line_out).await; } } }}这段代码与前面的示例很相似,但由于所有异步语句必然会完成,而且select!只接受类似于通道的类型,因此parse_line的调用被移动到了一个生成的任务中 。select要求类似于通道的类型,这能够保证放弃丢失的分支是安全的 。通道可以存储值,而且接收值是原子操作 。丢失select的分支并不会导致取消时丢失数据 。

探索 Rust 异步简化编程

文章插图
 
取消如果写入时发生错误会怎样?现状下read_task会继续执行 。然而,同学A希望它能出错,并优雅地关闭连接和所有任务 。不幸的是,这里就会遇到设计上的难题 。如果我们能够随时放弃任何异步语句,那么取消就非常容易了,只需要放弃future就可以 。我们需要一种方法来取消正在执行的操作,因为这是使用异步编程的主要目的之一 。为了实现这一点,JoinHandle提供了cancel方法:
async fn handle_connection(socket: TcpStream, channel: Channel) { let reader = Arc::new(socket); let writer = reader.clone; let read_task = task::spawn(async move { while let Some(line_in) in parse_line(&reader).await? { broadcast_line(line_in)?; } Ok() }); loop { // `channel` and JoinHandle are both "channel-like" types. select! { _ = read_task.join => { // The connection closed or we encountered an error, // exit the loop break; } line_out = channel.recv => { if write_line(&writer, line_out).await.is_err { read_task.cancel; read_task.join; } } } }}但是cancel能做什么呢?它并不能立即终止任务,因为现在异步语句是保证能够执行完成的 。但我们的确需要停止处理并尽快返回 。相反,被取消的任务中的所有资源类型都应该停止执行,并返回“被中断”的错误 。进一步的尝试也应该返回错误 。这种策略与Kotlin很相似,只不过Kotlin会抛出异常而已 。如果在任务取消时,read_task正在parse_line中等待socket.read_u32,那么read_u32函数会立即返回Err(
io::ErrorKind::Interrupted) 。操作符?会在任务中向上冒泡,导致整个任务中断 。
乍一看,这种行为非常像其他任务停止的行为,但其实不一样 。对于同学A而言,当前的异步Rust的终止行为看起来就像任务不确定地发生挂起一样 。如果能强制资源(例如套接字)在取消时返回错误,就能跟踪取消的流程 。同学A可以添加println!语句或使用其他调试策略来调查什么导致了任务中断 。
探索 Rust 异步简化编程

文章插图
AsyncDrop
然而,同学A并不知道,他的聊天服务器使用了io-uring来避免了绝大部分系统调用 。由于future能保证完成,再加上AsyncDrop,就可以透明底使用io-uring API 。当同学A在handle_connection的末尾drop TcpStream时,套接字会异步地关闭 。为了实现这一点,TcpStream的AsyncDrop实现如下:
impl AsyncDrop for TcpStream { async fn drop(&mut self) { self.uring.close(self.fd).await; }}有人提出了一个绝妙的方法在traits中使用async(
https://hackmd.io/bKfiVPRpTvyX8JK_Ng2EWA?view) 。唯一的问题就是如何处理隐含的.await点 。目前,异步地等待一个future需要进行一次.await调用 。而当一个值离开async上下文的范围时,编译器会为AsyncDrop trait添加一个隐藏的yield点 。这个行为违反了最少意料之外原则 。那么,既然其他的点都是明示的,为何此处需要一个隐含的await点?
解决“有时需要隐含drop”的问题的提议之一就是,要求使用明示的函数调用执行异步的drop:


推荐阅读