Skip to content

asyncio模块

asyncio 模块提供了使用协程构建并发应用的工具。它使用一种单线程单进程的方式实现并发,应用的各个部分彼此合作,可以显示的切换任务,一般会在程序阻塞 I/O 操作的时候发生上下文切换如等待读写文件,或者请求网络。同时 asyncio 也支持调度代码在将来的某个特定时间运行,从而支持一个协程等待另一个协程完成,以处理系统信号和识别其他一些事情

对于其他的并发模型大多数采取的都是线性的方式编写。并且依赖于语言运行时系统或操作系统的底层线程或进程来适当地改变上下文,而基于 asyncio 的应用要求应用代码显式地处理上下文切换

asyncio 提供的框架以事件循环(event loop)为中心,程序会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数

包含各种特定系统实现的模块化事件循环
传输和协议抽象
对 TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
模仿 futures 模块但适用于事件循环使用的 Future 类
基于 yield from 的协议和任务,可以让你用顺序的方式编写并发代码
必须使用一个将产生阻塞 IO 的调用时,有接口可以把这个事件转移到线程池
模仿 threading 模块中的同步原语、可以用在单线程内的协程之间

事件循环(event loop)

asyncio 模块支持每个进程拥有一个事件循环,使用事件循环驱动的协程实现并发

事件循环是每个 asyncio 应用的核心。 事件循环会运行异步任务和回调,执行网络 IO 操作,以及运行子进程。

可以定义事件循环来简化使用轮询方法来监控事件,通俗的说法就是“当 A 发生时,执行 B”

在程序执行期间事件循环不断周期反复,追踪某个数据结构内部发生的事件,将其纳入队列,如果主线程空闲则调用事件处理器一个一个地处理这些事件。

程序员不用控制任务的添加、删除和事件的控制。事件循环通过回调方法来知道事件的发生。它是 asyncio 提供的“中央处理设备”

asyncio.get_running_loop()

返回当前 OS 线程中正在运行的事件循环。如果没有正在运行的事件循环则会引发 RuntimeError。 此函数只能由协程或回调来调用。3.7 新版功能.

asyncio.get_event_loop()

获取当前事件循环。 如果当前 OS 线程没有设置当前事件循环并且 set_event_loop() 还没有被调用,asyncio 将创建一个新的事件循环并将其设置为当前循环。由于此函数具有相当复杂的行为(特别是在使用了自定义事件循环策略的时候),更推荐在协程和回调中使用 get_running_loop() 函数而非 get_event_loop()。应该考虑使用 asyncio.run() 函数而非使用低层级函数来手动创建和关闭事件循环。

asyncio.set_event_loop(loop)

将 loop 设置为当前 OS 线程的当前事件循环。

asyncio.new_event_loop()

创建一个新的事件循环。

loop.run_until_complete(future)

运行直到 future ( Future 的实例 ) 被完成。如果参数是 coroutine object ,将被隐式调度为 asyncio.Task 来运行。返回 Future 的结果 或者引发相关异常。

loop.run_forever()

运行事件循环直到 stop() 被调用。

loop.stop()

停止事件循环。

loop.is_running()

返回 True 如果事件循环当前正在运行。

loop.is_closed()

如果事件循环已经被关闭,返回 True 。

loop.close()

关闭事件循环。当这个函数被调用的时候,循环必须处于非运行状态。pending状态的回调将被丢弃。此方法清除所有的队列并立即关闭执行器,不会等待执行器完成。这个方法是幂等的和不可逆的。事件循环关闭后,不应调用其他方法。

loop.call_soon(callback, *args, context=None)

安排在下一次事件循环的迭代中使用 args 参数调用 callback 。回调按其注册顺序被调用。每个回调仅被调用一次。可选的仅关键字型参数 context 允许为要运行的 callback 指定一个自定义 contextvars.Context 。如果没有提供 context ,则使用当前上下文。返回一个能用来取消回调的 asyncio.Handle 实例。这个方法不是线程安全的。

loop.call_soon_threadsafe(callback, *args, context=None)

call_soon() 的线程安全变体。必须被用于安排 来自其他线程 的回调。

loop.call_later(delay, callback, *args, context=None)

安排 callback 在给定的 delay 秒(可以是 int 或者 float)后被调用。返回一个 asyncio.TimerHandle 实例,该实例能用于取消回调。callback 只被调用一次。如果两个回调被安排在同样的时间点,执行顺序未限定。可选的位置参数 args 在被调用的时候传递给 callback 。 如果你想把关键字参数传递给 callback ,请使用 functools.partial() 。可选的仅关键字型参数 context 允许为要运行的 callback 指定一个自定义 contextvars.Context 。如果没有提供 context ,则使用当前上下文。在 3.7 版更改: 仅用于关键字形参的参数 context 已经被添加。

loop.time()

根据时间循环内部的单调时钟,返回当前时间, float 值。

loop.create_future()

创建一个附加到事件循环中的 asyncio.Future 对象。这是在asyncio中创建Futures的首选方式。这让第三方事件循环可以提供Future 对象的替代实现(更好的性能或者功能)。

loop.create_task(coro)

安排一个 协程 的执行。返回一个 Task 对象。三方的事件循环可以使用它们自己定义的 Task 类的子类来实现互操作性。这个例子里,返回值的类型是 Task 的子类。

async和await关键字

Python3.4 引入 asyncio,Python3.5 引入 async/await

async 和 await 关键字是 Python 异步编程的主要构建块

在 def 语句之前使用的 async 关键字就定义了一个新的协程。协程函数的执行可以在严格定义的情况下暂停和恢复

使用 async 关键字定义的函数是特殊的。当被调用时,它们不执行里面的代码,而是返回一个协程对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import asyncio
async def async_hello():
    print('hello world!')

print(async_hello())
"""
<coroutine object async_hello at 0x1103e3248>
test_future.py:5: RuntimeWarning: coroutine 'async_hello' was never awaited
  print(async_hello())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
"""

在事件循环中调度其之前,协程对象不执行任何操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import asyncio
async def async_hello():
    print('hello world!')

loop = asyncio.get_event_loop()
loop.run_until_complete(async_hello())
loop.close()
"""
hello world!
"""

从协程中返回值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import asyncio
async def async_hello():
    return 'hello world!'

loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_hello())
print(result)
loop.close()
"""
hello world!
"""

run_until_complete可以获取协程的返回值,如果没有返回值,则像函数一样,默认返回None

可以通过调用 loop.create_task() 方法或者使用 asyncio.wait() 函数提供另一个对象来等待来将新任务添加到循环中。我们将使用后一种方法,并尝试异步打印使用 range() 函数生成一系列数字如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
async def print_number(number):
    print(number)

loop = asyncio.get_event_loop()
loop.run_until_complete(
    asyncio.wait([
        print_number(number) for number in range(10)    
    ])
)
loop.close()
"""
8
0
1
5
2
6
9
3
7
4
"""

asyncio.wait() 函数接受协程对象列表,并立即返回。结果是产生表示未来结果(futures)的对象的生成器。顾名思义,它用于等待所有提供的协程完成。

第二个很重要的关键字是 await。它用于等待协程或 future 的结果,并释放对事件循环的执行控制

假设我们想创建两个协程,它们将在循环中执行一些简单的任务

  • 随机等待几秒
  • 打印参数提供的一些文本以及睡眠时间

让我们以一个简单的实现开始,该实现有一些并发问题,我们以后尝试通过额外的 await 使用进行改进

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import random
import time

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1, 3) / 4
        time.sleep(time_to_sleep)
        print(f"{name} waiter {time_to_sleep} seconds")

async def main():
    await asyncio.wait([waiter('foo'), waiter('bar')])

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
bar waiter 0.25 seconds
bar waiter 0.25 seconds
bar waiter 0.75 seconds
bar waiter 0.75 seconds
foo waiter 0.5 seconds
foo waiter 0.5 seconds
foo waiter 0.5 seconds
foo waiter 0.75 seconds
python test_future.py  0.06s user 0.02s system 1% cpu 4.346 total
"""

可以看到,两个协程都完成了它们的执行,但不是异步方式。原因是它们都使用阻塞但不将控件释放到事件循环的 time.sleep() 函数。这将在多线程设置中更好地工作,但现在我们不想使用线程。那么我们如何解决这个问题呢?

答案是使用 asyncio.sleep(),它是 time.sleep() 的异步版本,并使用 await 关键字等待其结果。

coroutine asyncio.sleep(delay, result=None, *, loop=None)

阻塞 delay 指定的秒数。如果指定了 result,则当协程完成时将其返回给调用者。sleep() 总是会挂起当前任务,以允许其他任务运行。loop 参数已弃用,计划在 Python 3.10 中移除。

以下协程示例运行 5 秒,每秒显示一次当前日期:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio
import datetime
async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)
asyncio.run(display_date())
"""
2019-09-22 10:19:22.699990
2019-09-22 10:19:23.702620
2019-09-22 10:19:24.706132
2019-09-22 10:19:25.710048
2019-09-22 10:19:26.713851
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import random
import time

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1, 3) / 4
        await asyncio.sleep(time_to_sleep)
        print(f"{name} waiter {time_to_sleep} seconds")

async def main():
    await asyncio.wait([waiter('foo'), waiter('bar')])

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
foo waiter 0.25 seconds
foo waiter 0.25 seconds
bar waiter 0.75 seconds
foo waiter 0.25 seconds
bar waiter 0.75 seconds
foo waiter 0.75 seconds
bar waiter 0.5 seconds
bar waiter 0.75 seconds
python test_future.py  0.06s user 0.02s system 2% cpu 2.847 total
"""

这种简单改进的额外优点是代码运行地更快。总执行时间小于所有休眠时间的总和,因为协程会协同地释放控制

asyncio.run(coro, *, debug=False)

此函数运行传入的协程,负责管理 asyncio 事件循环并 完结异步生成器。当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。如果 debug 为 True,事件循环将以调试模式运行。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

以下代码段会在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio
import time
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
async def main():
    print(f"started at {time.strftime('%X')}")
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
"""
started at 20:49:43
hello
world
finished at 20:49:46
"""

注意:这里运行完成运行了 3 秒,是顺序执行而不是并发执行

协程中调用普通函数

在协程中可以通过一些方法去调用普通的函数,可以使用的关键字有call_soon, call_later, call_at

loop.call_soon(callback, *args, context=None)

安排在下一次事件循环的迭代中使用 args 参数调用 callback 。回调按其注册顺序被调用。每个回调仅被调用一次。可选的仅关键字型参数 context 允许为要运行的 callback 指定一个自定义 contextvars.Context 。如果没有提供 context ,则使用当前上下文。返回一个能用来取消回调的 asyncio.Handle 实例。这个方法不是线程安全的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import functools
def callback(args, *, kwargs="defalut"):
    print(f"普通函数做为回调函数,获取参数:{args},{kwargs}")
async def main(loop):
    print("注册callback")
    loop.call_soon(callback, 1)
    wrapped = functools.partial(callback, kwargs="not defalut")
    loop.call_soon(wrapped, 2)
    await asyncio.sleep(0.2)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
finally:
    loop.close()
"""
注册callback
普通函数做为回调函数,获取参数:1,defalut
普通函数做为回调函数,获取参数:2,not defalut
"""

loop.call_later(delay, callback, *args, context=None)

安排 callback 在给定的 delay 秒(可以是 int 或者 float)后被调用。返回一个 asyncio.TimerHandle 实例,该实例能用于取消回调。callback 只被调用一次。如果两个回调被安排在同样的时间点,执行顺序未限定。可选的位置参数 args 在被调用的时候传递给 callback 。 如果你想把关键字参数传递给 callback ,请使用 functools.partial() 。可选的仅关键字型参数 context 允许为要运行的 callback 指定一个自定义 contextvars.Context 。如果没有提供 context ,则使用当前上下文。在 3.7 版更改: 仅用于关键字形参的参数 context 已经被添加。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import time
import asyncio

def function_1(end_time, loop):
    print("function_1 called")
    if loop.time() + 1.0 < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

def function_2(end_time, loop):
    print("function_2 called")
    if loop.time() + 1.0 < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()

def function_3(end_time, loop):
    print("function_3 called")
    if loop.time() + 1.0 < end_time:
        loop.call_later(1, function_1, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()
end_loop = loop.time() + 9.0

loop.call_soon(function_1, end_loop, loop)
loop.run_forever()
loop.close()
"""
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import asyncio
import time

def callback(sleep_times):
    print(f"sleep {sleep_times} success")

loop = asyncio.get_event_loop()
loop.call_soon(callback, 2)
loop.run_forever()
"""
sleep 2 success
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import asyncio
import time

def callback(sleep_times):
    print(f"sleep {sleep_times} success")
def stoploop(loop):
    loop.stop()
loop = asyncio.get_event_loop()
loop.call_soon(callback, 2)
loop.call_soon(stoploop, loop)
loop.run_forever()
"""
sleep 2 success
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio
import time

def callback(sleep_times):
    print(f"sleep {sleep_times} success")
loop = asyncio.get_event_loop()
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
loop.run_forever()
"""
sleep 1 success
sleep 2 success
sleep 3 success
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio
import time

def callback(sleep_times):
    print(f"sleep {sleep_times} success")
loop = asyncio.get_event_loop()
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
loop.call_soon(callback, 4)
loop.run_forever()
"""
sleep 4 success
sleep 1 success
sleep 2 success
sleep 3 success
"""

loop.call_at(when, callback, *args, context=None)

call_at 第一个参数的含义代表的是一个单调时间,它和我们平时说的系统时间有点差异,
这里的时间指的是事件循环内部时间,可以通过 loop.time() 获取,然后可以在此基础上进行操作。
后面的参数和前面的两个方法一样。实际上 call_later 内部就是调用的 call_at。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio
import time

def callback(sleep_times):
    print(f"sleep {sleep_times} success")
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_at(now + 2, callback, 2)
loop.call_at(now + 1, callback, 1)
loop.call_at(3, callback, 3)
loop.call_soon(callback, 4)
loop.run_forever()
"""
sleep 4 success
sleep 1 success
sleep 2 success
sleep 3 success
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time

def callback(sleep_times, loop):
    print(f"success time = {loop.time()}")
    print(f"sleep {sleep_times} success")
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_at(now + 2, callback, 2, loop)
loop.call_at(now + 1, callback, 1, loop)
loop.call_at(3, callback, 3, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever()
"""
success time = 0.075957769
sleep 4 success
success time = 1.081155111
sleep 1 success
success time = 2.076290917
sleep 2 success
success time = 3.004078852
sleep 3 success
"""

可等待对象

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程, 任务Future.

任务 被用来设置日程以便 并发 执行协程。当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import time
async def test():
    print("start test")
    await asyncio.sleep(2)
    print("end test")
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [test() for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time() - start_time)
"""
start test
start test
start test
start test
start test
start test
start test
start test
start test
start test
end test
end test
end test
end test
end test
end test
end test
end test
end test
end test
2.003002166748047
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio
import time
async def test():
    await asyncio.sleep(2)
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [test() for i in range(10000)]
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time() - start_time)
"""
2.1645538806915283
"""

协程调用协程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio
async def result1():
    return 'result1'
async def cal(a, b):
    return a + b
async def main():
    res1 = await result1()
    res2 = await cal(1, 2)
    return (res1, res2)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
print(result)
"""
('result1', 3)
"""

Futures

定义了 Future 对象,类似代表了尚未完成计算的 concurrent.futures 模块

Future 是一个数据结构,表示还未完成的工作结果。事件循环可以监听 Future 对象是否完成。从而允许应用的一部分等待另一部分完成一些工作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
async def first(future1, result):
    print(f"future1 = {future1}")
    await asyncio.sleep(3)
    future1.set_result(result)
    print(f"future1 done = {future1}")
async def second(future1, future2, result):
    print(f"future2 = {future2}")
    await asyncio.sleep(1)
    print(f"second 中 future1 = {future1}")
    future2.set_result(result)
    print(f"future2 done = {future2}")
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        fut1 = asyncio.Future()
        fut2 = asyncio.Future()
        loop.run_until_complete(asyncio.wait([
            first(fut1, "fut1 is done"),
            second(fut1, fut2, "fut2 is done.")
        ]))
    finally:
        loop.close()
    print(f"获取future1的结果: {fut1.result()}")
    print(f"获取future2的结果: {fut2.result()}")
"""
future1 = <Future pending>
future2 = <Future pending>
second 中 future1 = <Future pending>
future2 done = <Future finished result='fut2 is done.'>
future1 done = <Future finished result='fut1 is done'>
获取future1的结果: fut1 is done
获取future2的结果: fut2 is done.
"""

使用 await 等待 Future 对象完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
def foo(future, result):
    future.set_result(result)
async def main(loop):
    fut = asyncio.Future()
    loop.call_later(2, foo, fut, "the result")
    result = await fut
    print(f"result = {result}")
    print(f"fut.result = {fut.result()}")
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()
"""
result = the result
fut.result = the result
python test2.py  0.06s user 0.02s system 3% cpu 2.084 total
"""

添加回调

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import functools
def callback(future, n):
    print(f"回调函数 {n} result = {future.result()}")
async def main(fut):
    fut.add_done_callback(functools.partial(callback, n=1))
    fut.add_done_callback(functools.partial(callback, n=2))
    await asyncio.sleep(1)
    fut.set_result('the result')
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        future = asyncio.Future()
        loop.run_until_complete(main(future))
    finally:
        loop.close()
"""
回调函数 1 result = the result
回调函数 2 result = the result
python test2.py  0.06s user 0.02s system 6% cpu 1.081 total
"""

下面的例子演示如何利用 Futures 类管理两个执行任务的协程,例如 first_coroutines 和 second_coroutines。例如,这两个协程分别执行求前 n 个整数之和和计算 n 的阶乘的任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import sys
import asyncio

async def first_coroutine(future, N):
    count = 0
    for i in range(1, N + 1):
        count += i
    await asyncio.sleep(4)
    future.set_result(f"fitst coroutine (sum of {N} integers) result = {count}")

async def second_coroutine(future, N):
    count = 1
    for i in range(1, N + 1):
        count *= i
    await asyncio.sleep(3)
    future.set_result(f"second coroutin (factorial) result = {count}")

def got_result(future):
    print(future.result())

N1 = int(sys.argv[1])
N2 = int(sys.argv[2])

loop = asyncio.get_event_loop()

future1 = asyncio.Future()
future2 = asyncio.Future()

tasks = [
    first_coroutine(future1, N1),
    second_coroutine(future2, N2)
]

future1.add_done_callback(got_result)
future2.add_done_callback(got_result)

loop.run_until_complete(asyncio.wait(tasks))
loop.close()
"""
second coroutin (factorial) result = 2
fitst coroutine (sum of 1 integers) result = 1
python test2.py 1 2  0.06s user 0.02s system 1% cpu 4.085 total
"""

ensure_future

asyncio.ensure_future(obj, *, loop=None)

返回:

  • obj 参数会是保持原样,如果 obj 是 Future、 Task 或 类似 Future 的对象( isfuture() 用于测试。)
  • a Task object wrapping obj, if obj is a coroutine (iscoroutine() is used for the test); in this case the coroutine will be scheduled by ensure_future().
  • 等待 obj 的 Task 对象,如果 obj 是一个可等待对象( inspect.isawaitable() 用于测试)

如果 obj 不是上述对象会引发一个 TypeError 异常。

注意:create_task() 函数,是创建新任务的首选途径。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import time
import inspect
async def test():
    print("start test")
    await asyncio.sleep(2)
    return "test result"
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    f = test()
    print(f)
    if inspect.iscoroutine(test()):
        print('test() is a coroutine')
    get_future = asyncio.ensure_future(test())
    print(get_future)
    loop.run_until_complete(get_future)
    print(get_future.result())
    print(time.time() - start_time)
"""
<coroutine object test at 0x10a379248>
test2.py:13: RuntimeWarning: coroutine 'test' was never awaited
  if inspect.iscoroutine(test()):
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
test() is a coroutine
<Task pending coro=<test() running at test2.py:4>>
start test
test result
2.0074970722198486
sys:1: RuntimeWarning: coroutine 'test' was never awaited
"""

任务(Tasks)

这是 asyncio 中的一个子类,用于封装并管理并行模式下的协程

Task 是 Future 的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源可用时,事件循环会调度任务允许,并生成一个结果,从而可以由其他协程消费

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import asyncio

async def factorial(number):
    f = 1
    for i in range(2, number):
        print(f"Asyncio.Task: Compute factorial {i}")
        await asyncio.sleep(1)
        f *= i
    print(f"Asyncio.Task - factorial({number}) = {f}")

async def fibonacci(number):
    a, b = 0, 1
    for i in range(number):
        print(f"Asyncio.Task: Compute fibonacci {i}")
        await asyncio.sleep(1)
        a, b = b, a + b
    print(f"Asyncio.Task - fibonacci({number}) = {a}")

async def binomialCoeff(n, k):
    result = 1
    for i in range(1, k + 1):
        result = result * (n - i + 1) / i
        print(f"Asyncio.Task: Compute binomialCoeff {i}")
        await asyncio.sleep(1)
    print(f"Asyncio.Task - binomialCoeff({n}, {k}) = {result}")

loop = asyncio.get_event_loop()
tasks = [
    asyncio.Task(factorial(10)),
    asyncio.Task(fibonacci(10)),
    asyncio.Task(binomialCoeff(20, 10))
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
"""
Asyncio.Task: Compute factorial 2
Asyncio.Task: Compute fibonacci 0
Asyncio.Task: Compute binomialCoeff 1
Asyncio.Task: Compute factorial 3
Asyncio.Task: Compute fibonacci 1
Asyncio.Task: Compute binomialCoeff 2
Asyncio.Task: Compute factorial 4
Asyncio.Task: Compute fibonacci 2
Asyncio.Task: Compute binomialCoeff 3
Asyncio.Task: Compute factorial 5
Asyncio.Task: Compute fibonacci 3
Asyncio.Task: Compute binomialCoeff 4
Asyncio.Task: Compute factorial 6
Asyncio.Task: Compute fibonacci 4
Asyncio.Task: Compute binomialCoeff 5
Asyncio.Task: Compute factorial 7
Asyncio.Task: Compute fibonacci 5
Asyncio.Task: Compute binomialCoeff 6
Asyncio.Task: Compute factorial 8
Asyncio.Task: Compute fibonacci 6
Asyncio.Task: Compute binomialCoeff 7
Asyncio.Task: Compute factorial 9
Asyncio.Task: Compute fibonacci 7
Asyncio.Task: Compute binomialCoeff 8
Asyncio.Task - factorial(10) = 362880
Asyncio.Task: Compute fibonacci 8
Asyncio.Task: Compute binomialCoeff 9
Asyncio.Task: Compute fibonacci 9
Asyncio.Task: Compute binomialCoeff 10
Asyncio.Task - fibonacci(10) = 55
Asyncio.Task - binomialCoeff(20, 10) = 184756.0
"""

create_task

asyncio.create_task(coro)

将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程

因为协程是没有状态的,我们通过使用 create_task 方法可以将协程包装成有状态的任务。还可以在任务运行的过程中取消任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import time
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
"""
started at 20:52:39
hello
world
finished at 20:52:41
"""

注意,这里只用了 2 秒的时间,因为它是并发执行的

查看返回值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import time
import inspect
async def test():
    print("start test")
    await asyncio.sleep(2)
    return "test result"
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(test())
    print(task)
    loop.run_until_complete(task)
    print(f"task result = {task.result()}")
    print(time.time() - start_time)
"""
<Task pending coro=<test() running at test2.py:4>>
start test
task result = test result
2.0053250789642334
"""

添加回调

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time
import inspect
async def test():
    print("start test")
    await asyncio.sleep(2)
    return "test result"
def callback(future):
    print("call back 不要香菜")
    print(f"call back 中的 future.result() = {future.result()}")
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(test())
    print(task)
    task.add_done_callback(callback)
    loop.run_until_complete(task)
    print(f"task result = {task.result()}")
    print(time.time() - start_time)
"""
<Task pending coro=<test() running at test2.py:4>>
start test
call back 不要香菜
call back 中的 future.result() = test result
task result = test result
2.0047709941864014
"""

回调函数中传递参数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import time
import inspect
from functools import partial
async def test():
    print("start test")
    await asyncio.sleep(2)
    return "test result"
def callback(x, future):
    print(f"call back x = {x}")
    print("call back 不要香菜")
    print(f"call back 中的 future.result() = {future.result()}")
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(test())
    print(task)
    task.add_done_callback(partial(callback, 666))
    loop.run_until_complete(task)
    print(f"task result = {task.result()}")
    print(time.time() - start_time)
"""
<Task pending coro=<test() running at test2.py:5>>
start test
call back x = 666
call back 不要香菜
call back 中的 future.result() = test result
task result = test result
2.004493236541748
"""

gather

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

并发 运行 aws 序列中的 可等待对象。

如果 aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。

如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 -- 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())
"""
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import time
import inspect
async def test():
    print("start test")
    await asyncio.sleep(2)
    print("test result") 
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [test() for i in range(10)]
    loop.run_until_complete(asyncio.gather(*tasks))
    print(time.time() - start_time)
"""
start test
start test
start test
start test
start test
start test
start test
start test
start test
start test
test result
test result
test result
test result
test result
test result
test result
test result
test result
test result
2.005251884460449
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print(f"数字{n}被取消")
        raise
async def main():
    tasks = [num(i) for i in range(10)]
    complete = await asyncio.gather(*tasks)
    for i in complete:
        print("当前数字", i)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
"""
当前数字 0
当前数字 1
当前数字 2
当前数字 3
当前数字 4
当前数字 5
当前数字 6
当前数字 7
当前数字 8
当前数字 9
python test2.py  0.06s user 0.03s system 8% cpu 1.023 total
"""

asyncio.gatherasyncio.wait的区别

gather 更加 high-level

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time
import inspect
async def test():
    print("start test")
    await asyncio.sleep(2)
    print("test result") 
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    group1 = [test() for i in range(2)]
    group2 = [test() for i in range(3)]
    loop.run_until_complete(asyncio.gather(*group1, *group2))
    print(time.time() - start_time)
"""
start test
start test
start test
start test
start test
test result
test result
test result
test result
test result
2.0045411586761475
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import time
import inspect
async def test():
    print("start test")
    await asyncio.sleep(2)
    print("test result") 
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    group1 = [test() for i in range(2)]
    group2 = [test() for i in range(3)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    loop.run_until_complete(asyncio.gather(group1, group2))
    print(time.time() - start_time)
"""
start test
start test
start test
start test
start test
test result
test result
test result
test result
test result
2.001896858215332
"""

取消任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
async def child():
    print("进入子协程")
    return "the result"
async def main(loop):
    print("将协程child包装成任务")
    task = loop.create_task(child())
    print("通过cancel方法可以取消任务")
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("取消任务抛出CancelledError异常")
    else:
        print("获取任务的结果", task.result())
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()
"""
将协程child包装成任务
通过cancel方法可以取消任务
取消任务抛出CancelledError异常
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import time
import inspect
async def test(sleep_time):
    print("start test")
    await asyncio.sleep(sleep_time)
    print("test end") 
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task1 = test(2)
    task2 = test(3)
    task3 = test(3)
    tasks = [task1, task2, task3]
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()
    print(time.time() - start_time)
"""
start test
start test
start test
^Ccancel task
True
cancel task
True
cancel task
True
cancel task
True
0.5112431049346924
"""

wait_for

coroutine asyncio.wait_for(aw, timeout, *, loop=None)
等待 aw 可等待对象 完成,指定 timeout 秒数后超时。

如果 aw 是一个协程,它将自动作为任务加入日程。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。

如果发生超时,任务将取消并引发 asyncio.TimeoutError.

要避免任务 取消,可以加上 shield()。

函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。

如果等待被取消,则 aw 指定的对象也会被取消。

The loop argument is deprecated and scheduled for removal in Python 3.10.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio
async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())
"""
timeout!
python test2.py  0.07s user 0.02s system 7% cpu 1.115 total
"""

wait

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。如果 aws 中的某个可等待对象为协程,它将自动作为任务加入日程。直接向 wait() 传入协程对象已弃用

返回两个 Task/Future 集合: (done, pending)。

用法:
done, pending = await asyncio.wait(aws)

The loop argument is deprecated and scheduled for removal in Python 3.10.

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio
async def foo():
    return 42
async def main():
    task = asyncio.create_task(foo())
    print(task)
    done, pending = await asyncio.wait({task})
    print(len(done), done)
    print(len(pending), pending)
    if task in done:
        print(task.result())
asyncio.run(main())
"""
<Task pending coro=<foo() running at test2.py:2>>
1 {<Task finished coro=<foo() done, defined at test2.py:2> result=42>}
0 set()
42
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
async def num(n):
    try:
        await asyncio.sleep(n*0.1)
        return n
    except asyncio.CancelledError:
        print(f"数字{n}被取消")
        raise
async def main():
    tasks = [num(i) for i in range(10)]
    complete, pending = await asyncio.wait(tasks, timeout=0.5)
    print(type(complete), len(complete))
    for i in complete:
        print(type(i))
        break
    print(type(complete), len(pending))
    for i in complete:
        print("当前数字",i.result())
    if pending:
        for p in pending:
            p.cancel()
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
"""
<class 'set'> 5
<class '_asyncio.Task'>
<class 'set'> 5
当前数字 0
当前数字 2
当前数字 3
当前数字 1
当前数字 4
数字5被取消
数字9被取消
数字7被取消
数字8被取消
数字6被取消
"""

as_completed

asyncio.as_completed(aws, *, loop=None, timeout=None)

并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。

如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import time
async def foo(n):
    print('Waiting: ', n)
    await asyncio.sleep(n)
    return n
async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
Waiting:  1
Waiting:  2
Waiting:  4
Task ret: 1
Task ret: 2
Task ret: 4
python test2.py  0.06s user 0.02s system 1% cpu 4.084 total
"""

参考资料

协程与任务

事件循环

https://bbs.huaweicloud.com/blogs/109055