Skip to content

使用生成器协程

yield item 这行代码会产出一个值,提供给 next(...) 的调用方;此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用 next()。调用方会从生成器拉取值

在协程中,yield通常出现在表达式的右边(例如:datum = yield),可以产出值,也可以不产出(如果 yield关键字后面没有表达式,那么生成器产出None)。协程可能会从调用方接收数据,不过调用方把数据提供给协程使用的是.send(datum)方法,而不是next(...)函数。通常,调用方会把值推送给协程

yield关键字甚至还可以不接收或传出数据。不管数据如何流动,yield都是一种流程控制工具,使用它可以实现协作式多任务:协程可以把控制器让步给中心调度程序,从而激活其他的协程

从根本上把yield视作流程控制的方式,这样就好理解协程了

生成器如何进化成协程

生成器的调用方可以使用 .send(...)方法发送数据,发送的数据会成为生成器函数中yield表达式的值。因此,生成器可以作为协程使用。协程是指一个过程,这个过程与调用方协作,产出由调用发提供的值

除了 .send(...)方法,还有.throw(...).close()方法:前者的作用是让调用方抛出异常,在生成器中处理,后者的作用是终止生成器。

用作协程的生成器的基本行为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def simple_coroutine():
    print('-> coroutine started')
    x = yield
    print(f'-> coroutine received: {x}')

my_coro = simple_coroutine()
print(my_coro)

next(my_coro) # 首先要调用 next(...) 函数,因为生成器还没启动,没在 yield 语句处暂停,所以一开始无法发送数据
my_coro.send(42) # 调用这个方法后,协程定义体中的 yield 表达式会计算 42;现在,协程会恢复,一直运行到下一个 yield 表达式,或者终止
"""
<generator object simple_coroutine at 0x1094576d8>
-> coroutine started
-> coroutine received: 42
Traceback (most recent call last):
  File "test2.py", line 10, in <module>
    my_coro.send(42)
StopIteration
"""

协程可以身处四个状态中的一个。当前状态可以使用inspect.getgeneratorstate(...)函数确定,该函数会返回下述字符串中的一个
GEN_CREATED: 等待开始执行
GEN_RUNNING: 解释器正在执行,只有在多线程应用中才能看到这个状态。此外,生成器对象在自己身上调用getgeneratorstate函数也行,不过这样做没什么用
GEN_SUSPENDED: 在 yield 表达式处暂停
GEN_CLOSED: 执行结束

因为send方法的参数会成为暂停的yield表达式的值,所以,仅当协程处于暂停状态时才能调用send方法,例如my_coro.send(42)。不过,如果协程还没激活(即,状态是GEN_CREATED),情况就不同了。因此,始终要调用next(my_coro)激活协程--也可以调用my_coro.send(None),效果一样

如果创建协程对象后立即把None之外的值发给它,会出现下述错误

1
TypeError: can't send non-None value to a just-started generator

注意错误信息,它表述得相当清楚

最先调用next(my_coro)函数这一步通常称为"预激"(prime)协程(即,让协程向前执行到第一个yield表达式,准备好作为活跃的协程使用)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from inspect import getgeneratorstate

def simple_coro2(a):
    print(f"-> Started a = {a}")
    b = yield a
    print(f"-> Received: b = {b}")
    c = yield a + b
    print(f"-> Received: c = {c}")

my_coro2 = simple_coro2(14)
print(getgeneratorstate(my_coro2))
next(my_coro2) # 向前执行协程到第一个 yield 表达式,打印 -> Started: a = 14 消息,然后产出 a 的值,并且暂停,等待为 b 赋值
print(getgeneratorstate(my_coro2))

my_coro2.send(28) # 把数字 28 发给暂停的协程;计算 yield 表达式,得到 28,然后把那么数绑定给 b。打印 -> Received: b = 28 消息,产出 a + b 的值 (42),然后协程暂停,等待为 c 赋值

my_coro2.send(99) # 把数字 99 发送给暂停的协程;计算 yield 表达式,得到 99,然后把那么数绑定给 c。打印 -> Received: c = 99 消息,然后协程终止,导致生成器对象抛出 StopIteration 异常
"""
GEN_CREATED
-> Started a = 14
GEN_SUSPENDED
-> Received: b = 28
-> Received: c = 99
Traceback (most recent call last):
  File "test2.py", line 17, in <module>
    my_coro2.send(99)
StopIteration
"""

关键的一点是,协程在 yield 关键字所在的位置暂停执行。在赋值语句中,= 右边的代码在赋值之前执行。因此,对于b = yield a这行代码来说,等到客户端代码再激活协程时才会设定 b 的值

simple_coro2 协程的执行过程分为 3 个阶段
(1) 调用next(my_coro2),打印第一个消息,然后执行yield a,产出数字 14
(2) 调用my_coro2.send(28),把28赋值给b,打印第二个消息,然后执行yield a + b,产出数字42
(3) 调用my_coro2.send(99),把99赋值给 c,打印第三个消息,协程终止

示例:使用协程计算移动平均值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from inspect import getgeneratorstate

def averager():
    total = 0.0
    count = 0
    average = None
    while True: # 这个无限循环表明,只要调用方不断把值发送给这个协程,它就会一直接收值,然后生成结果。仅当调用方在协程上调用 .close 方法,或者没有对协程的引用而被垃圾回收程序回收时,这个协程才会终止
        term = yield average # 这里的 yield 表达式用于暂停执行协程,把结果发给调用方,还用于接收调用方后面发给协程的值,恢复无限循环
        total += term
        count += 1
        average = total / count

coro_avg = averager()
print(next(coro_avg)) # 调用 next 函数,预激协程
print(coro_avg.send(10)) # 计算移动平均值:多次调用 .send(...) 方法,产出当前的平均值
print(coro_avg.send(30))
print(coro_avg.send(5))

print(getgeneratorstate(coro_avg))
coro_avg.close()
print(getgeneratorstate(coro_avg))
"""
None
10.0
20.0
15.0
GEN_SUSPENDED
GEN_CLOSED
"""

使用协程的好处是,totalcount声明为局部变量即可,无需使用实例属性或闭包在多次调用之间保持上下文

调用 next(coro_avg)函数后,协程会向前执行到yield表达式,产出average变量的初始值--None。此时,协程在yield表达式处暂停,等到调用方发送值。coro_avg.send(10)那一行发送一个值,激活协程,把发送的值赋给term,并更新total, countaverage三个变量的值,然后开始while循环的下一次迭代,产出average变量的值,等待下一次为term变量赋值

使用协程之前必须预激,可是这一步容易忘记,为了避免忘记,可以在协程上使用一个特殊的装饰器

预激协程的装饰器

如果不预激,那么协程没什么用。调用my_coro.send(x)之前,记住一定要调用next(my_coro)。为了简化协程的用法,有时会使用一个预激装饰器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from functools import wraps
from inspect import getgeneratorstate

def coroutine(func):
    """装饰器:向前执行到第一个 yield 表达式,预激 fuc """
    @wraps(func)
    def primer(*args, **kwargs): # 把被装饰的生成器函数替换成这里的 primer 函数;调用 primer 函数时,返回预激后的生成器
        gen = func(*args, **kwargs) # 调用被装饰的函数,获取生成器对象
        next(gen) # 预激生成器
        return gen # 返回生成器
    return primer

@coroutine
def averager():
    total = 0.0
    count = 0
    average = None
    while True: 
        term = yield average 
        total += term
        count += 1
        average = total / count

coro_avg = averager() # 调用 averager() 函数创建一个生成器对象,在 coroutine 装饰器的 primer 函数中已经预激了这个生成器
print(getgeneratorstate(coro_avg)) # getgeneratorstate 函数指明,处于 GEN_SUPENDED 状态,因此这个协程已经准备好,可以接收值了
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))
"""
GEN_SUSPENDED
10.0
20.0
15.0
"""

很多框架都提供了处理协程的特殊装饰器,不过不是所有装饰器都用于预激协程,有些会提供其他服务,例如勾入事件循环。比如说,异步网络库 Tornado 提供了 tornado.gen 装饰器

使用 yield from 句法调用协程时,会自动预激,asyncio.coroutine 装饰器不会预激协程,因此能兼容 yield from 句法

终止协程和异常处理

协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
In [3]: coro_avg.send('spam') # 发送的值不是数字,导致协程内部有异常抛出
   ...:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-3-ab30dc25c38f> in <module>
----> 1 coro_avg.send('spam')

<ipython-input-1-c34812c28843> in averager()
     18     while True:
     19         term = yield average
---> 20         total += term
     21         count += 1
     22         average = total / count

TypeError: unsupported operand type(s) for +=: 'float' and 'str'

In [4]: coro_avg.send(10) # 由于协程内没有处理异常,协程会终止。如果试图重新激活协程,会抛出 StopIteraiton 异常
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-4-f6cfc385c20d> in <module>
----> 1 coro_avg.send(10)

StopIteration:

出错的原因是,发送给协程的 'spam' 值不能加到 total 变量上

上面的实例暗示了终止协程的一种方式:发送某个哨符值,让协程退出。内置的NoneEllipsis等常量经常用作哨符值。Ellipsis的优点是,数据流中不太常有这个值。还有人把StopIteration类(类本身,而不是实例,也不抛出)作为哨符值;也就是说,是像这样使用的: my_coro.send(StopIteration)

可以在生成器对象上调用两个方法,显式地把异常发给协程,这两个方法是throwclose

generator.throw(exc_type[, exc_value[, traceback]])

致使生成器在暂停的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个yield表达式,而产出的值会成为调用generator.throw方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中

generator.close()

致使生成器在暂停的yield表达式处抛出GeneratorExit异常。如果生成器没有处理这个异常,或者抛出了StopIteration异常(通常是指运行到结尾),调用方不会报错。如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。生成器抛出的其他异常会向上冒泡,传给调用方

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from inspect import getgeneratorstate

class DemoException(Exception):
    pass

def demo_exc_handling():
    print('-> coroutine started')
    while True:
        try:
            x = yield
        except DemoException:
            print('*** DemoException handled. Continuing...')
        else:
            print(f'-> coroutine received: {x}')
    raise RuntimeError('This line should never run.') # 这一行永远不会执行,因为只有未处理的异常才会终止循环,而一旦出现未处理的异常,协程会立即终止

exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.send(22)
exc_coro.close()
print(getgeneratorstate(exc_coro))
"""
-> coroutine started
-> coroutine received: 11
-> coroutine received: 22
GEN_CLOSED
"""

如果把DemoException异常传入demo_exc_handling协程,它会处理,然后继续运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.send(22)

exc_coro.throw(DemoException)
exc_coro.send(33)
print(getgeneratorstate(exc_coro))
"""
-> coroutine started
-> coroutine received: 11
-> coroutine received: 22
*** DemoException handled. Continuing...
-> coroutine received: 33
GEN_SUSPENDED
"""

但是,如果传入协程的异常没有处理,协程会停止,即状态变成GEN_CLOSED

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
In [7]: exc_coro = demo_exc_handling()
   ...: next(exc_coro)
   ...: exc_coro.send(11)
   ...: exc_coro.send(22)
   ...: exc_coro.throw(ZeroDivisionError)
-> coroutine started
-> coroutine received: 11
-> coroutine received: 22
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-7-f772a7f3a884> in <module>
      3 exc_coro.send(11)
      4 exc_coro.send(22)
----> 5 exc_coro.throw(ZeroDivisionError)

<ipython-input-6-3b072d0a6b4a> in demo_exc_handling()
      8     while True:
      9         try:
---> 10             x = yield
     11         except DemoException:
     12             print('*** DemoException handled. Continuing...')

ZeroDivisionError:

In [8]: print(getgeneratorstate(exc_coro))
   ...:
GEN_CLOSED

如果不管协程如何结束都想做些清理工作,要把协程定义体中相关的代码放入try finally块中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def demo_exc_handling():
    print('-> coroutine started')
    try:
        while True:
            try:
                x = yield
            except DemoException:
                print('*** DemoException handled. Continuing...')
            else:
                print(f'-> coroutine received: {x}')
    finally:
        print('-> coroutin ending')

Python 3.3 引入yield from结构的主要原因之一与把异常传入嵌套的协程有关。另一个原因是让协程更方便地返回值。

让协程返回值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break # 为了返回值,协程必须正常终止;因此,这里有个条件判断,以便退出累计循环
        total += term
        count += 1
        average = total / count
    return Result(count, average) # 返回一个 namedtuple,包含 count 和 average 两个字段,在 Python 3.3 之前,如果生成器返回值,解释器会报语法错误

coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
coro_avg.send(None) # 发送 None 会终止循环,导致协程结束,返回结果。一如既往,生成器对象会抛出 StopIteration 异常。异常对象的 value 属性保存着返回的值
"""
Traceback (most recent call last):
  File "test2.py", line 23, in <module>
    coro_avg.send(None)
StopIteration: Result(count=3, average=15.5)
"""

注意,return表达式的值会偷偷传给调用方,赋值给StopIteration异常的一个属性。这样做有点不合常理,但是能保留生成器对象的常规行为--耗尽时抛出StopIteration异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
try:
    coro_avg.send(None)
except StopIteration as exc:
    result = exc.value

print(result)
"""
Result(count=3, average=15.5)
"""

获取协程的返回值虽然要绕个圈子,但这是 PEP 380 定义的方式,当我们意识到这一点之后就说得通了:yielf from结构会在内部自动捕获StopIteration异常。这种处理方式与for循环处理StopIteration异常的方式一样:循环机制使用用于易于理解的方式处理异常。对yield from结构来说,解释器不仅会捕获StopIteration异常,还会把value属性的值变为yield from的值。可惜,我们无法在控制台中使用交互的方式测试这种行为,因为在函数外部使用yield from(以及yield)会导致句法出错

使用 yield from

yield from 的作用比yield多得多。在asyncio模块中有await关键字,这个名称比yield from好多了,因为它传达了至关重要的一点:在生成器gen中使用yield from subgen()时,subgen会获得控制权,把产出的值传给gen的调用方,即调用方可以直接控制subgen。与此同时,gen会阻塞,等待subgen终止

yield from可用于简化for循环中的yielf表达式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i

print(list(gen()))
"""
['A', 'B', 1, 2]
"""
1
2
3
4
5
6
7
8
def gen():
    yield from 'AB'
    yield from range(1, 3)

print(list(gen()))
"""
['A', 'B', 1, 2]
"""
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def chain(*iterables):
    for it in iterables:
        yield from it

s = 'ABC'
# t = tuple(range(3))
t = range(3)
print(list(chain(s, t)))
"""
['A', 'B', 'C', 0, 1, 2]
"""

yield from x 表达式对x对象所做的第一件事是,调用iter(x),从中获取迭代器。因此,x可以是任何可迭代的对象

可是,如果yield from结构唯一的作用是替代产出值的嵌套for循环,这个结构很有可能不会添加到 Python 语言中。yield from结构的本质作用无法通过简单的可迭代对象说明,而要发散思维,使用嵌套的生成器。因此,引入yield from结构的 PEP 380 才起了 "Syntax for Delegating to a Subgenerator"("把职责委托给子生成器的句法")这个标题

yield from的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。有了这个结构,协程可以通过以前不可能的方式委托职责

若想使用yield from结构,就要大幅改动代码。为了说明需要改动的部分,PEP 380 使用了一些专门的术语

委派生成器:包含yield from <iterable>表达式的生成器函数
子生成器: 从yield from表达式中<iterable>部分获取的生成器。这就是 PEP 380 的标题("Syntax for Delegating to a Subgenerator")中所说的"子生成器"(subgenerator)

调用方:PEP 380 使用“调用方”这个术语指代调用委派生成器的客户端代码。在不同的语境中,可以使用“客户端”代替“调用方”,以此与委派生成器(也是调用方,因为它调用了子生成器)区分开

子生成器可能是简单的迭代器,只实现了__next__方法;但是,yield from结构的目的是为了支持实现了__next__sendclosethrow方法的生成器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from collections import namedtuple

Result = namedtuple('Result', 'count average')

# 子生成器
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None: # 至关重要的终止条件。如果不这么做,使用 yield from 调用这个协程的生成器会永远阻塞
            break
        total += term
        count += 1
        average = total / count
    return Result(count, average) # 返回的 Result 会成为 grouper 函数中 yield from 表达式的值

# 委派生成器
def grouper(results, key):
    while True: # 这个循环每次迭代时会新建一个 averager 实例;每个实例都是作为协程使用的生成器对象
        """
        grouper 发送的每个值都会经由 yield from 处理,通过管道传给 averager 实例。grouper 会在 yield from 表达式处暂停,
        等待 averager 实例处理客户端发来的值。averager 实例运行完毕后,返回的值绑定到 results[key] 上。
        while 循环会不断创建 averager 实例,处理更多的值
        """
        results[key] = yield from averager()

# 客户端代码,即调用方
def main(data):
    results = {}
    for key, values in data.items():
        """
        group 是调用 grouper 函数得到的生成器对象,传给 grouper 函数的第一个参数是 results,用于收集结果;第二个参数是某个键。group 作为协程使用
        """
        group = grouper(results, key)
        next(group) # 预激协程
        for value in values: # 把各个 value 传给 grouper。传入的值最终到达 averager 函数中 term = yield 那一行;grouper 永远不知道传入的值是什么
            group.send(value)
        # 重要!
        group.send(None) # 把 None 传入 grouper,导致当前的 averager 实例终止,也让 grouper 继续运行,再创建一个 averager 实例,处理下一组值
    report(results)

data = {
    'girls;kg': [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m': [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
}

### 输出报告
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))

if __name__ == '__main__':
    main(data)
"""
 9 boys  averaging 40.42kg
 9 boys  averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m
"""

示例1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def consumer():
    r = 'init'
    while True:
        n = yield r
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'
def produce(c):
    print(c.send(None))
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()
c = consumer()
produce(c)

输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
init
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK