如何精确控制 asyncio 中并发运行的多个任务( 三 )


至于我们要判断哪些任务是正常执行,哪些任务是抛了异常,便可以通过返回值来判断 。如果 isinstance(res, Exception) 为 True,那么证明任务出现了异常,否则正常执行 。虽然这有点笨拙 , 但也能凑合用,因为 API 并不完美 。
当然以上这些都不能算是缺点,gather 真正的缺点有两个:

  • 如果我希望所有任务都执行成功,要是有一个任务失败,其它任务自动取消 , 该怎么实现呢?比如发送 Web 请求,如果一个请求失败,其他所有请求也会失败(要取消请求以释放资源) 。显然要做到这一点不容易 , 因为协程被包装在后台的任务中;
  • 其次,必须等待所有任务执行完成,才能处理结果,如果想要在结果完成后立即处理它们,这就存在问题 。例如有一个请求需要 100 毫秒,而另一个请求需要 20 秒,那么在处理 100 毫秒完成的那个请求之前 , 我们将等待 20 秒 。
而 asyncio 也提供了用于解决这两个问题的 API 。
在任务完成时立即处理如果想在某个结果生成之后就对其进行处理,这是一个问题;如果有一些可以快速完成的等待对象,和一些可能需要很长时间完成的等待对象 , 这也可能是一个问题 。因为 gather 需要等待所有对象执行完毕,这就导致应用程序可能变得无法响应 。
想象一个用户发出 100 个请求,其中两个很慢,但其余的都很快完成 。如果一旦有请求完成 , 可以向用户输出一些信息,来提升用户的使用体验 。
为处理这种情况,asyncio 公开了一个名为 as_completed 的 API 函数,这个函数接收一个可等待对象(awaitable)组成的列表,并返回一个生成器 。通过遍历 , 等待它们中的每一个对象都完成,并且哪个先完成,哪个就先被迭代 。这意味着将能在结果可用时立即就处理它们,但很明显此时就没有所谓的顺序了,因为无法保证哪些请求先完成 。
import asyncioimport timeasync def delay(seconds):await asyncio.sleep(seconds)return f"我睡了 {seconds} 秒"async def main():# asyncio 提供的用于等待一组 awaitable 对象的 API 都很智能# 如果检测到你传递的是协程,那么会自动包装成任务# 不过还是建议手动包装一下tasks = [asyncio.create_task(delay(seconds))for seconds in (3, 5, 2, 4, 6, 1)]for finished in asyncio.as_completed(tasks):print(await finished)loop = asyncio.get_event_loop()start = time.perf_counter()loop.run_until_complete(main())end = time.perf_counter()print("总耗时:", end - start)"""我睡了 1 秒我睡了 2 秒我睡了 3 秒我睡了 4 秒我睡了 5 秒我睡了 6 秒总耗时: 6.000872417"""和 gather 不同,gather 是等待一组任务全部完成之后才返回,并且会自动将结果取出来,结果值的顺序和添加任务的顺序是一致的 。对于 as_completed 而言,它会返回一个生成器,我们遍历它,哪个任务先完成则哪个就先被处理 。
如何精确控制 asyncio 中并发运行的多个任务

文章插图
那么问题来了 , 如果出现异常了该怎么办?很简单,直接异常捕获即可 。
然后我们再来思考一个问题,任何基于 Web 的请求都存在花费很长时间的风险,服务器可能处于过重的资源负载下,或者网络连接可能很差 。
之前我们看到了通过 wait_for 函数可以为特定请求添加超时 , 但如果想为一组请求设置超时怎么办?as_completed 函数通过提供一个可选的 timeout 参数来处理这种情况 , 它允许以秒为单位指定超时时间 。如果花费的时间超过设定的时间,那么迭代器中的每个可等待对象都会在等待时抛出 TimeoutException 。
import asyncioasync def delay(seconds):await asyncio.sleep(seconds)return f"我睡了 {seconds} 秒"async def main():tasks = [asyncio.create_task(delay(seconds))for seconds in (1, 5, 6)]for finished in asyncio.as_completed(tasks, timeout=3):try:print(await finished)except asyncio.TimeoutError:print("超时啦")loop = asyncio.get_event_loop()loop.run_until_complete(main())"""我睡了 1 秒超时啦超时啦"""as_completed 非常适合用于尽快获得结果,但它也有缺点 。
第一个缺点是没有任何方法可快速了解我们正在等待哪个协程或任务,因为运行顺序是完全不确定的 。如果不关心顺序 , 这可能没问题,但如果需要以某种方式将结果与请求相关联,那么将面临挑战 。
第二个缺点是超时,虽然会正确地抛出异常并继续运行程序,但创建的所有任务仍将在后台运行 。如果想取消它们,很难确定哪些任务仍在运行,这是我们面临的另一个挑战 。


推荐阅读