使用生成器协程
yield item
这行代码会产出一个值,提供给 next(...)
的调用方;此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用 next()。调用方会从生成器拉取值
在协程中,yield
通常出现在表达式的右边(例如:datum = yield
),可以产出值,也可以不产出(如果 yield
关键字后面没有表达式,那么生成器产出None
)。协程可能会从调用方接收数据,不过调用方把数据提供给协程使用的是.send(datum)
方法,而不是next(...)
函数。通常,调用方会把值推送给协程
yield
关键字甚至还可以不接收或传出数据。不管数据如何流动,yield
都是一种流程控制工具,使用它可以实现协作式多任务:协程可以把控制器让步给中心调度程序,从而激活其他的协程
从根本上把yield
视作流程控制的方式,这样就好理解协程了
生成器如何进化成协程
生成器的调用方可以使用 .send(...)
方法发送数据,发送的数据会成为生成器函数中yield
表达式的值。因此,生成器可以作为协程使用。协程是指一个过程,这个过程与调用方协作,产出由调用发提供的值
除了 .send(...)
方法,还有.throw(...)
和.close()
方法:前者的作用是让调用方抛出异常,在生成器中处理,后者的作用是终止生成器。
用作协程的生成器的基本行为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | def simple_coroutine():
print('-> coroutine started')
x = yield
print(f'-> coroutine received: {x}')
my_coro = simple_coroutine()
print(my_coro)
next(my_coro) # 首先要调用 next(...) 函数,因为生成器还没启动,没在 yield 语句处暂停,所以一开始无法发送数据
my_coro.send(42) # 调用这个方法后,协程定义体中的 yield 表达式会计算 42;现在,协程会恢复,一直运行到下一个 yield 表达式,或者终止
"""
<generator object simple_coroutine at 0x1094576d8>
-> coroutine started
-> coroutine received: 42
Traceback (most recent call last):
File "test2.py", line 10, in <module>
my_coro.send(42)
StopIteration
"""
|
协程可以身处四个状态中的一个。当前状态可以使用inspect.getgeneratorstate(...)
函数确定,该函数会返回下述字符串中的一个
GEN_CREATED
: 等待开始执行
GEN_RUNNING
: 解释器正在执行,只有在多线程应用中才能看到这个状态。此外,生成器对象在自己身上调用getgeneratorstate
函数也行,不过这样做没什么用
GEN_SUSPENDED
: 在 yield 表达式处暂停
GEN_CLOSED
: 执行结束
因为send
方法的参数会成为暂停的yield
表达式的值,所以,仅当协程处于暂停状态时才能调用send
方法,例如my_coro.send(42)
。不过,如果协程还没激活(即,状态是GEN_CREATED
),情况就不同了。因此,始终要调用next(my_coro)
激活协程--也可以调用my_coro.send(None)
,效果一样
如果创建协程对象后立即把None
之外的值发给它,会出现下述错误
| TypeError: can't send non-None value to a just-started generator
|
注意错误信息,它表述得相当清楚
最先调用next(my_coro)
函数这一步通常称为"预激"(prime)协程(即,让协程向前执行到第一个yield
表达式,准备好作为活跃的协程使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | from inspect import getgeneratorstate
def simple_coro2(a):
print(f"-> Started a = {a}")
b = yield a
print(f"-> Received: b = {b}")
c = yield a + b
print(f"-> Received: c = {c}")
my_coro2 = simple_coro2(14)
print(getgeneratorstate(my_coro2))
next(my_coro2) # 向前执行协程到第一个 yield 表达式,打印 -> Started: a = 14 消息,然后产出 a 的值,并且暂停,等待为 b 赋值
print(getgeneratorstate(my_coro2))
my_coro2.send(28) # 把数字 28 发给暂停的协程;计算 yield 表达式,得到 28,然后把那么数绑定给 b。打印 -> Received: b = 28 消息,产出 a + b 的值 (42),然后协程暂停,等待为 c 赋值
my_coro2.send(99) # 把数字 99 发送给暂停的协程;计算 yield 表达式,得到 99,然后把那么数绑定给 c。打印 -> Received: c = 99 消息,然后协程终止,导致生成器对象抛出 StopIteration 异常
"""
GEN_CREATED
-> Started a = 14
GEN_SUSPENDED
-> Received: b = 28
-> Received: c = 99
Traceback (most recent call last):
File "test2.py", line 17, in <module>
my_coro2.send(99)
StopIteration
"""
|
关键的一点是,协程在 yield 关键字所在的位置暂停执行。在赋值语句中,= 右边的代码在赋值之前执行。因此,对于b = yield a
这行代码来说,等到客户端代码再激活协程时才会设定 b 的值
simple_coro2 协程的执行过程分为 3 个阶段
(1) 调用next(my_coro2)
,打印第一个消息,然后执行yield a
,产出数字 14
(2) 调用my_coro2.send(28)
,把28
赋值给b
,打印第二个消息,然后执行yield a + b
,产出数字42
(3) 调用my_coro2.send(99)
,把99
赋值给 c,打印第三个消息,协程终止
示例:使用协程计算移动平均值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | from inspect import getgeneratorstate
def averager():
total = 0.0
count = 0
average = None
while True: # 这个无限循环表明,只要调用方不断把值发送给这个协程,它就会一直接收值,然后生成结果。仅当调用方在协程上调用 .close 方法,或者没有对协程的引用而被垃圾回收程序回收时,这个协程才会终止
term = yield average # 这里的 yield 表达式用于暂停执行协程,把结果发给调用方,还用于接收调用方后面发给协程的值,恢复无限循环
total += term
count += 1
average = total / count
coro_avg = averager()
print(next(coro_avg)) # 调用 next 函数,预激协程
print(coro_avg.send(10)) # 计算移动平均值:多次调用 .send(...) 方法,产出当前的平均值
print(coro_avg.send(30))
print(coro_avg.send(5))
print(getgeneratorstate(coro_avg))
coro_avg.close()
print(getgeneratorstate(coro_avg))
"""
None
10.0
20.0
15.0
GEN_SUSPENDED
GEN_CLOSED
"""
|
使用协程的好处是,total
和count
声明为局部变量即可,无需使用实例属性或闭包在多次调用之间保持上下文
调用 next(coro_avg)
函数后,协程会向前执行到yield
表达式,产出average
变量的初始值--None
。此时,协程在yield
表达式处暂停,等到调用方发送值。coro_avg.send(10)
那一行发送一个值,激活协程,把发送的值赋给term
,并更新total, count
和average
三个变量的值,然后开始while
循环的下一次迭代,产出average
变量的值,等待下一次为term
变量赋值
使用协程之前必须预激,可是这一步容易忘记,为了避免忘记,可以在协程上使用一个特殊的装饰器
预激协程的装饰器
如果不预激,那么协程没什么用。调用my_coro.send(x)
之前,记住一定要调用next(my_coro)
。为了简化协程的用法,有时会使用一个预激装饰器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 | from functools import wraps
from inspect import getgeneratorstate
def coroutine(func):
"""装饰器:向前执行到第一个 yield 表达式,预激 fuc """
@wraps(func)
def primer(*args, **kwargs): # 把被装饰的生成器函数替换成这里的 primer 函数;调用 primer 函数时,返回预激后的生成器
gen = func(*args, **kwargs) # 调用被装饰的函数,获取生成器对象
next(gen) # 预激生成器
return gen # 返回生成器
return primer
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count
coro_avg = averager() # 调用 averager() 函数创建一个生成器对象,在 coroutine 装饰器的 primer 函数中已经预激了这个生成器
print(getgeneratorstate(coro_avg)) # getgeneratorstate 函数指明,处于 GEN_SUPENDED 状态,因此这个协程已经准备好,可以接收值了
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))
"""
GEN_SUSPENDED
10.0
20.0
15.0
"""
|
很多框架都提供了处理协程的特殊装饰器,不过不是所有装饰器都用于预激协程,有些会提供其他服务,例如勾入事件循环。比如说,异步网络库 Tornado 提供了 tornado.gen 装饰器
使用 yield from 句法调用协程时,会自动预激,asyncio.coroutine 装饰器不会预激协程,因此能兼容 yield from 句法
终止协程和异常处理
协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | In [3]: coro_avg.send('spam') # 发送的值不是数字,导致协程内部有异常抛出
...:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-3-ab30dc25c38f> in <module>
----> 1 coro_avg.send('spam')
<ipython-input-1-c34812c28843> in averager()
18 while True:
19 term = yield average
---> 20 total += term
21 count += 1
22 average = total / count
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
In [4]: coro_avg.send(10) # 由于协程内没有处理异常,协程会终止。如果试图重新激活协程,会抛出 StopIteraiton 异常
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-4-f6cfc385c20d> in <module>
----> 1 coro_avg.send(10)
StopIteration:
|
出错的原因是,发送给协程的 'spam' 值不能加到 total 变量上
上面的实例暗示了终止协程的一种方式:发送某个哨符值,让协程退出。内置的None
和Ellipsis
等常量经常用作哨符值。Ellipsis
的优点是,数据流中不太常有这个值。还有人把StopIteration
类(类本身,而不是实例,也不抛出)作为哨符值;也就是说,是像这样使用的: my_coro.send(StopIteration)
可以在生成器对象上调用两个方法,显式地把异常发给协程,这两个方法是throw
和close
generator.throw(exc_type[, exc_value[, traceback]])
致使生成器在暂停的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个yield
表达式,而产出的值会成为调用generator.throw
方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中
generator.close()
致使生成器在暂停的yield
表达式处抛出GeneratorExit
异常。如果生成器没有处理这个异常,或者抛出了StopIteration
异常(通常是指运行到结尾),调用方不会报错。如果收到GeneratorExit
异常,生成器一定不能产出值,否则解释器会抛出RuntimeError
异常。生成器抛出的其他异常会向上冒泡,传给调用方
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | from inspect import getgeneratorstate
class DemoException(Exception):
pass
def demo_exc_handling():
print('-> coroutine started')
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print(f'-> coroutine received: {x}')
raise RuntimeError('This line should never run.') # 这一行永远不会执行,因为只有未处理的异常才会终止循环,而一旦出现未处理的异常,协程会立即终止
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.send(22)
exc_coro.close()
print(getgeneratorstate(exc_coro))
"""
-> coroutine started
-> coroutine received: 11
-> coroutine received: 22
GEN_CLOSED
"""
|
如果把DemoException
异常传入demo_exc_handling
协程,它会处理,然后继续运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.send(22)
exc_coro.throw(DemoException)
exc_coro.send(33)
print(getgeneratorstate(exc_coro))
"""
-> coroutine started
-> coroutine received: 11
-> coroutine received: 22
*** DemoException handled. Continuing...
-> coroutine received: 33
GEN_SUSPENDED
"""
|
但是,如果传入协程的异常没有处理,协程会停止,即状态变成GEN_CLOSED
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | In [7]: exc_coro = demo_exc_handling()
...: next(exc_coro)
...: exc_coro.send(11)
...: exc_coro.send(22)
...: exc_coro.throw(ZeroDivisionError)
-> coroutine started
-> coroutine received: 11
-> coroutine received: 22
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
<ipython-input-7-f772a7f3a884> in <module>
3 exc_coro.send(11)
4 exc_coro.send(22)
----> 5 exc_coro.throw(ZeroDivisionError)
<ipython-input-6-3b072d0a6b4a> in demo_exc_handling()
8 while True:
9 try:
---> 10 x = yield
11 except DemoException:
12 print('*** DemoException handled. Continuing...')
ZeroDivisionError:
In [8]: print(getgeneratorstate(exc_coro))
...:
GEN_CLOSED
|
如果不管协程如何结束都想做些清理工作,要把协程定义体中相关的代码放入try finally
块中
1
2
3
4
5
6
7
8
9
10
11
12 | def demo_exc_handling():
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print(f'-> coroutine received: {x}')
finally:
print('-> coroutin ending')
|
Python 3.3 引入yield from
结构的主要原因之一与把异常传入嵌套的协程有关。另一个原因是让协程更方便地返回值。
让协程返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break # 为了返回值,协程必须正常终止;因此,这里有个条件判断,以便退出累计循环
total += term
count += 1
average = total / count
return Result(count, average) # 返回一个 namedtuple,包含 count 和 average 两个字段,在 Python 3.3 之前,如果生成器返回值,解释器会报语法错误
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
coro_avg.send(None) # 发送 None 会终止循环,导致协程结束,返回结果。一如既往,生成器对象会抛出 StopIteration 异常。异常对象的 value 属性保存着返回的值
"""
Traceback (most recent call last):
File "test2.py", line 23, in <module>
coro_avg.send(None)
StopIteration: Result(count=3, average=15.5)
"""
|
注意,return
表达式的值会偷偷传给调用方,赋值给StopIteration
异常的一个属性。这样做有点不合常理,但是能保留生成器对象的常规行为--耗尽时抛出StopIteration
异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
print(result)
"""
Result(count=3, average=15.5)
"""
|
获取协程的返回值虽然要绕个圈子,但这是 PEP 380 定义的方式,当我们意识到这一点之后就说得通了:yielf from
结构会在内部自动捕获StopIteration
异常。这种处理方式与for
循环处理StopIteration
异常的方式一样:循环机制使用用于易于理解的方式处理异常。对yield from
结构来说,解释器不仅会捕获StopIteration
异常,还会把value
属性的值变为yield from
的值。可惜,我们无法在控制台中使用交互的方式测试这种行为,因为在函数外部使用yield from
(以及yield
)会导致句法出错
使用 yield from
yield from
的作用比yield
多得多。在asyncio
模块中有await
关键字,这个名称比yield from
好多了,因为它传达了至关重要的一点:在生成器gen
中使用yield from subgen()
时,subgen
会获得控制权,把产出的值传给gen
的调用方,即调用方可以直接控制subgen
。与此同时,gen
会阻塞,等待subgen
终止
yield from
可用于简化for
循环中的yielf
表达式
| def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i
print(list(gen()))
"""
['A', 'B', 1, 2]
"""
|
| def gen():
yield from 'AB'
yield from range(1, 3)
print(list(gen()))
"""
['A', 'B', 1, 2]
"""
|
| def chain(*iterables):
for it in iterables:
yield from it
s = 'ABC'
# t = tuple(range(3))
t = range(3)
print(list(chain(s, t)))
"""
['A', 'B', 'C', 0, 1, 2]
"""
|
yield from x
表达式对x
对象所做的第一件事是,调用iter(x)
,从中获取迭代器。因此,x
可以是任何可迭代的对象
可是,如果yield from
结构唯一的作用是替代产出值的嵌套for
循环,这个结构很有可能不会添加到 Python 语言中。yield from
结构的本质作用无法通过简单的可迭代对象说明,而要发散思维,使用嵌套的生成器。因此,引入yield from
结构的 PEP 380 才起了 "Syntax for Delegating to a Subgenerator"("把职责委托给子生成器的句法")这个标题
yield from
的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。有了这个结构,协程可以通过以前不可能的方式委托职责
若想使用yield from
结构,就要大幅改动代码。为了说明需要改动的部分,PEP 380 使用了一些专门的术语
委派生成器:包含yield from <iterable>
表达式的生成器函数
子生成器: 从yield from
表达式中<iterable>
部分获取的生成器。这就是 PEP 380 的标题("Syntax for Delegating to a Subgenerator")中所说的"子生成器"(subgenerator)
调用方:PEP 380 使用“调用方”这个术语指代调用委派生成器的客户端代码。在不同的语境中,可以使用“客户端”代替“调用方”,以此与委派生成器(也是调用方,因为它调用了子生成器)区分开
子生成器可能是简单的迭代器,只实现了__next__
方法;但是,yield from
结构的目的是为了支持实现了__next__
、send
、close
和throw
方法的生成器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 | from collections import namedtuple
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None: # 至关重要的终止条件。如果不这么做,使用 yield from 调用这个协程的生成器会永远阻塞
break
total += term
count += 1
average = total / count
return Result(count, average) # 返回的 Result 会成为 grouper 函数中 yield from 表达式的值
# 委派生成器
def grouper(results, key):
while True: # 这个循环每次迭代时会新建一个 averager 实例;每个实例都是作为协程使用的生成器对象
"""
grouper 发送的每个值都会经由 yield from 处理,通过管道传给 averager 实例。grouper 会在 yield from 表达式处暂停,
等待 averager 实例处理客户端发来的值。averager 实例运行完毕后,返回的值绑定到 results[key] 上。
while 循环会不断创建 averager 实例,处理更多的值
"""
results[key] = yield from averager()
# 客户端代码,即调用方
def main(data):
results = {}
for key, values in data.items():
"""
group 是调用 grouper 函数得到的生成器对象,传给 grouper 函数的第一个参数是 results,用于收集结果;第二个参数是某个键。group 作为协程使用
"""
group = grouper(results, key)
next(group) # 预激协程
for value in values: # 把各个 value 传给 grouper。传入的值最终到达 averager 函数中 term = yield 那一行;grouper 永远不知道传入的值是什么
group.send(value)
# 重要!
group.send(None) # 把 None 传入 grouper,导致当前的 averager 实例终止,也让 grouper 继续运行,再创建一个 averager 实例,处理下一组值
report(results)
data = {
'girls;kg': [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m': [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
}
### 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))
if __name__ == '__main__':
main(data)
"""
9 boys averaging 40.42kg
9 boys averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m
"""
|
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | def consumer():
r = 'init'
while True:
n = yield r
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
print(c.send(None))
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
|
输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | init
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
|