[ Python  ]

Python 异步库 asyncio(一):Coroutines 和 Tasks

本文介绍如何使用 Python asyncio 模块里的 Coroutine 和 Task 来完成异步 IO。

Coroutine (协程、协同程序)

Coroutine 使用 async/await 语法来声明和使用。例如下面的代码(需要 Python 3.7+)先输出 “hello” 后等待 1 秒再输出 “world”:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意:调用一个 coroutine 不会使它运行:

>>> main()
<coroutine object main at 0x1053bb7c8>

asyncio 提供了三种方法来真正运行一个 coroutine:

Awaitable

如果一个对象能够在 await 关键字之后使用,则这个对象被称作 awaitable 的。许多 asyncio 的 API 都是接收 awaitable 作为参数的。

有三种主要的 awaitable 对象:coroutines, Tasks, 和 Futures

Coroutines

Python coroutines 可以被其他 coroutine 来 await:

  import asyncio
  
  async def nested():
      return 42
  
  async def main():
      # 如果直接调用 "nested()",则什么也不会发生
      # 一个 coroutine 对象被创建,但没有被 await,
      # 所以它根本不会被运行
      nested()
  
      # 现在 await 它
      print(await nested())  # 将会输出 "42"
  
  asyncio.run(main())

重要:本文中的 coroutine 可以指代两个概念:

asyncio 也支持传统的 generator-based coroutines。

Tasks

Task 被用来并行地调度 coroutines,例如:

  import asyncio
  
  async def nested():
      return 42
  
  async def main():
      # 计划 nested() 在之后的某个时刻被并行地运行
      task = asyncio.create_task(nested())
  
      # "task" 现在可以被用来取消 "nested()",或
      # 被 await 来等待 nested() 的结束:
      await task
  
  asyncio.run(main())

Futures

Future 是一个特殊的低级 awaitable 对象,通常在你的应用层代码里不会用到它。

运行一个 asyncio 程序

asyncio.run(coro, **, *debug=False)

(Python 3.7 新增)

这个函数运行传入的 coro 协程,处理 asyncio 事件循环并且 finalizing asynchronous generators。将 debug 设为 True 可以使它运行在 Debug 模式。

当前线程只能有一个 asyncio 事件循环在运行,所以请保证仅仅调用这个函数一次,所以它被用来调用整个异步 IO 程序的入口函数,如 “main()”。

创建 Tasks

asyncio.create_task(coro)

(Python 3.7 新增)

打包一个 coro 协程,返回一个 task 对象。在 Python 3.7 之前,可以使用低级的 asyncio.ensure_future 来替代:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

Sleeping

coroutine asyncio.sleep(delay, result=None, **, *loop=None)

阻塞当前 task,持续 delay 秒。

下面的协程例子是每秒打印出当前时间,持续 5 秒:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

并行地运行 Tasks

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

传入一些 awaitables,返回单个 awaitable,使它们并行地运行。如果传入的 aws 是 coroutine,它会自动被调度为 task。

例子:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

防止取消(Cancellation)

awaitable asyncio.shield(aw, **, *loop=None)

保护一个 awaitable 对象使之不会被取消。如果 aw 是一个 coroutine,它会被自动调度为一个 Task。

语句

res = await shield(something())

等价于

res = await something()

只是如果包含这行代码的 coroutine 被取消了,那么在 something() 里运行的 Task 不会被取消。

如果想要完全忽视取消(不推荐),可以使用 try/except 来完成:

try:
    res = await shield(something())
except CancelledError:
    res = None