Skip to content

consurrent.futures模块

将异步代码同步化

异步编程很棒,特别是对于构建可扩展应用程序感兴趣的后端开发人员。在实践中,它是构建高度并发服务器的最重要的工具之一

但现实是痛苦的。许多处理 I/O 繁忙问题的流行包并没有使用异步代码。主要原因是:

  • Python 3 及其一些高级功能的采用率依然很低
  • Python 初学者对各种并发概念理解不足

这意味着,迁移现有的同步多线程应用程序和软件包要么不可能(由于架构的限制),要么代价太大。许多项目可以从并入异步风格的多任务中获益匪浅,但只有少数人最终会这样做

这意味着,现在,从一开始尝试构建异步应用程序时,你就会遇到很多困难。在大多数情况下,这些困难类似于不兼容接口和 I/O 操作的非异步阻塞

当然,当你遇到这种不兼容性,并且只是同步获取所需的资源时,你有时可以不使用 await。但是在你等待结果时,这将阻止每个其他协程继续执行它的代码。

另一个问题是长时间运行 CPU 密集型任务。当你执行 I/O 操作时,可以很容易地从协程释放控制。当从文件系统或套接字写/读时,你最终会等待,所以使用 await 调用是最好的。但是,当你需要进行比较耗时的计算时,依然会阻止其他协程执行代码

因此,有时你不知道该怎么做时,当某些东西根本不适合你的异步应用程序时,使用一段代码,将它推迟到单独的线程或进程。你可以假装这是一个协程,释放控制到事件循环,最终在准备好时处理结果。

幸运的是,Python 标准库提供了 concurent.futures 模块,它也与 asyncio 模块集成。你可以使用这两个模块一起调度在其他线程或者其他进程中执行阻塞函数

随着 Python 3.2 版本的发布,Python 引入了 concurrent.futures 模块,支持管理并发编程任务,如进程池和线程池、非确定性执行流以及多进程和线程同步

Executor与Future

concurrent.futures 模块中最重要的类是 Executor 和 Future

Executor 表示可并行处理工作项的资源池。它有两个具体的实现: ThreadPoolExecutor(线程池)和 ProcessPoolExecutor(进程池)

每个 Executor 提供 3 个方法

submit(fn, *args, **kwargs): 调度可调用对象 fn,以 fn(args *kwargs) 方式执行并返回 Future 对像代表可调用对象的执行。

1
2
3
4
5
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None, chunksize=1): 以异步模式使用给定参数来执行函数

shutdown(Wait=True): 向执行器(executor)传递释放资源的信号

最值得注意的方法是 submit(),因为它返回 Future 对象。他表示一个可调用的异步执行,只是间接表示其结果。为了获得提交的可调用的实际返回值,你需要调用 Future.result() 方法。如果可调用已经完成,result() 方法将不会阻塞它,只会返回函数输出。如果不是真的,它将阻塞,直到结果准备好。把它看作一个结果

在事件循环中使用Executor

Executor.submit() 方法返回的 Future 类实例在概念上非常接近异步编程中使用的协程。这就是为什么我们可以使用 Executor 在协同多任务和多进程或多线程之间进行混合

此解决方法的核心是事件循环类 BaseEventLoop.run_in_executor(executor, func, *args) 方法。它会在进程池或线程池中调度执行由 executor 参数表示的 func 函数。这个方法最重要的是它返回一个新的 awaitable(一个可以用 await 语句等待的对象)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import asyncio
import random
from concurrent.futures import ThreadPoolExecutor

def db_opt(task_num):
    sleep_time = random.randint(1, 3)
    print(f"task_num = {task_num}, sleep_time = {sleep_time}")
    time.sleep(sleep_time)

start_time = time.time()
executor = ThreadPoolExecutor(max_workers=3)

loop = asyncio.get_event_loop()
tasks = []
for num in range(10):
    task = loop.run_in_executor(executor, db_opt, num)
    tasks.append(task)

loop.run_until_complete(asyncio.wait(tasks))
print(f"use time = {time.time() - start_time}")
"""
task_num = 0, sleep_time = 1
task_num = 1, sleep_time = 3
task_num = 2, sleep_time = 3
task_num = 3, sleep_time = 2
task_num = 4, sleep_time = 2
task_num = 5, sleep_time = 3
task_num = 6, sleep_time = 3
task_num = 7, sleep_time = 2
task_num = 8, sleep_time = 3
task_num = 9, sleep_time = 1
use time = 9.00458288192749
"""

使用线程池和进程池

一个线程池或进程池(也被称为池化)指的是用来优化、简化程序内部线程/进程使用的软件管理器。通过池化,你可以向 pooler 提交将由其执行的任务。这个池子里有一个待执行任务的内部队列,以及一些执行这些任务的线程或进程。

池化中的一个常见概念是复用:一个线程(或进程)在其生命周期中,被多次用于执行不同的任务。复用减少了创建进程或线程的开销,提升了利用池化技巧的程序的性能。

虽然复用不是非用不可的,但它却是促使程序员在其应用中使用池化的主要原因之一。

current.futures 模块提供了 Executor 类的两个子类,这两个子类分别可以异步式地管理一个线程池和一个进程池

  • concurrent.futures.ThreadPoolExecutor(max_workers)
  • concurrent.futures.ProcessPoolExecutor(max_workers)

max_workers 参数表示用于异步执行调度的最大 worker 数量

要执行的任务是:有一个由数字 1~10 组成的列表 number_list,针对列表中的每个元素,执行 1000 万此计数迭代(纯粹为了消磨时间),然后将得到的值与该元素相乘

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import time
import concurrent.futures
number_list = list(range(1, 11))
def evaluate_item(x):
    result_item = count(x)
    print(f"tiem {x} result {result_item}")
def count(number):
    for i in range(10000000):
        i = i + 1
    return i * number
if __name__ == "__main__":
    # 线性执行
    start_time = time.time()
    for item in number_list:
        evaluate_item(item)
    print(f"线性执行运行时间为 {time.time() - start_time} 秒")
    # 线程池执行
    start_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for item in number_list:
            executor.submit(evaluate_item, item)
    print(f"线程池执行运行时间为 {time.time() - start_time} 秒")
    # 进程池执行
    start_time = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for item in number_list:
            executor.submit(evaluate_item, item)
    print(f"进程池执行运行时间为 {time.time() - start_time} 秒")
"""
tiem 1 result 10000000
tiem 2 result 20000000
tiem 3 result 30000000
tiem 4 result 40000000
tiem 5 result 50000000
tiem 6 result 60000000
tiem 7 result 70000000
tiem 8 result 80000000
tiem 9 result 90000000
tiem 10 result 100000000
线性执行运行时间为 4.578747749328613 秒
tiem 1 result 10000000
tiem 4 result 40000000
tiem 3 result 30000000
tiem 2 result 20000000
tiem 5 result 50000000
tiem 6 result 60000000
tiem 7 result 70000000
tiem 8 result 80000000
tiem 9 result 90000000
tiem 10 result 100000000
线程池执行运行时间为 4.501923084259033 秒
tiem 4 result 40000000
tiem 3 result 30000000
tiem 1 result 10000000
tiem 5 result 50000000
tiem 2 result 20000000
tiem 7 result 70000000
tiem 8 result 80000000
tiem 6 result 60000000
tiem 9 result 90000000
tiem 10 result 100000000
进程池执行运行时间为 2.3629109859466553 秒
"""

ProcessPoolExecutor 使用的是 multiprocessing 模块,可以让我们避开全局解释器锁(global interpreter lock),大幅降低执行时间

但是对于 I/O 密集型任务,多线程和线程池还是能起到显著效果的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
import concurrent.futures
number_list = list(range(1, 11))
def evaluate_item(x):
    time.sleep(1)
def count(number):
    for i in range(10000000):
        i = i + 1
    return i * number
if __name__ == "__main__":
    # 线性执行
    start_time = time.time()
    for item in number_list:
        evaluate_item(item)
    print(f"线性执行运行时间为 {time.time() - start_time} 秒")
    # 线程池执行
    start_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for item in number_list:
            executor.submit(evaluate_item, item)
    print(f"线程池执行运行时间为 {time.time() - start_time} 秒")
    # 进程池执行
    start_time = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for item in number_list:
            executor.submit(evaluate_item, item)
    print(f"进程池执行运行时间为 {time.time() - start_time} 秒")
"""
线性执行运行时间为 10.036600828170776 秒
线程池执行运行时间为 2.013601064682007 秒
进程池执行运行时间为 2.047109842300415 秒
"""

几乎所有服务器端应用都用到了池化,因为需要处理来自任意数量客户端的大量并发请求。而还有不少其他的应用要求每个任务立刻执行,或者对执行任务的线程具备更大的控制权。在这种情况下,池化不是最好的选择