Skip to content

进程

使用 multiprocessing 模块创建进程

multiprocessing 模块提供了一个 Process 类来代表一个进程对象

1
2
3
4
5
6
7
8
9
Process(
    group=None,
    target=None, # 表示当前进程启动时执行的可调用对象
    name=None, # 为当前进程实例的别名
    args=(), # 表示传递给 target 函数的参数元组
    kwargs={}, # 表示传递给 target 函数的参数字典
    *,
    daemon=None,
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from multiprocessing import Process
def test(interval):
    print(f"我是子进程 interval = {interval}")
def main():
    print("主进程开始")
    p = Process(target=test, args=(1,)) # 实例化 Process 进程类
    p.start() # 启动子进程
    print("主进程结束")
if __name__ == "__main__":
    main()
"""
主进程开始
主进程结束
我是子进程 interval = 1
"""

Process 的实例 p 常用的方法:

  • is_alive(): 判断进程实例是否还在执行
  • join([timeout]): 是否等待进程实例执行结束,或等待多少秒
  • start(): 启动进程实例(创建子进程)
  • run(): 如果没有给定 target 参数,对这个对象调用 start() 方法时,就将执行对象中的 run() 方法
  • terminate(): 不管任务是否完成,立即终止

Process 类常用属性:

  • name: 当前进程的实例别名,默认为 Process-N,N 为从 1 开始递增的整数
  • pid: 当前进程实例的 PID 值
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import time
from multiprocessing import Process
def child(interval):
    print(f"子进程 {os.getpid()} 开始执行,父进程为 {os.getppid()}")
    t_start = time.time()
    time.sleep(interval) # 程序将会被挂起 interval 秒
    print(f"子进程 {os.getpid()} 执行时间为 {time.time() - t_start} 秒")
if __name__ == "__main__":
    t_start = time.time()
    print("---------父进程开始执行---------")
    print(f"父进程 PID: {os.getpid()}")
    p1 = Process(target=child, args=(1,))
    p2 = Process(target=child, name="child 2", args=(2,))
    p1.start()
    p2.start()
    print(f"p1.is_alive = {p1.is_alive()}")
    print(f"p2.is_alive = {p2.is_alive()}")
    print(f"p1.name = {p1.name}")
    print(f"p1.pid = {p1.pid}")
    print(f"p2.name = {p2.name}")
    print(f"p2.pid = {p2.pid}")
    p1.join() # 等待 p1 进程结束
    p2.join() # 等待 p2 进程结束
    print(f"---------- 父进程结束,执行时间为 {time.time() - t_start}---------")
"""
---------父进程开始执行---------
父进程 PID: 8272
p1.is_alive = True
p2.is_alive = True
p1.name = Process-1
p1.pid = 8288
p2.name = child 2
p2.pid = 8289
子进程 8288 开始执行,父进程为 8272
子进程 8289 开始执行,父进程为 8272
子进程 8288 执行时间为 1.0012149810791016 秒
子进程 8289 执行时间为 2.0001070499420166 秒
---------- 父进程结束,执行时间为 2.0070090293884277---------
"""

使用 Process 子类创建进程

对于一些简单的小任务,通常使用 Process(target=test) 方式实现多进程。但是如果要处理复杂任务的进程,通常定义一个类,使其继承 Process 类,每次实例化这个类的时候,就等同于实例化一个进程对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class SubProcess(Process):
    def __init__(self, interval, name=""):
        super().__init__()
        self.interval = interval
        if name:
            self.name = name
    # 重写了 Process 类的 run() 方法
    def run(self):
        print(f"子进程 {os.getpid()} 开始执行,父进程为 {os.getppid()}")
        t_start = time.time()
        time.sleep(self.interval)
        print(f"子进程 {os.getpid()} 执行时间为 {time.time() - t_start} 秒")    
if __name__ == "__main__":
    t_start = time.time()
    print("---------父进程开始执行---------")
    print(f"父进程 PID: {os.getpid()}")
    p1 = SubProcess(interval=1, name='child 1')
    p2 = SubProcess(interval=2)
    p1.start()
    p2.start()
    print(f"p1.is_alive = {p1.is_alive()}")
    print(f"p2.is_alive = {p2.is_alive()}")
    print(f"p1.name = {p1.name}")
    print(f"p1.pid = {p1.pid}")
    print(f"p2.name = {p2.name}")
    print(f"p2.pid = {p2.pid}")
    p1.join() # 等待 p1 进程结束
    p2.join() # 等待 p2 进程结束
    print(f"---------- 父进程结束,执行时间为 {time.time() - t_start}")
"""
---------父进程开始执行---------
父进程 PID: 8551
p1.is_alive = True
p2.is_alive = True
p1.name = child 1
p1.pid = 8567
p2.name = SubProcess-2
p2.pid = 8568
子进程 8567 开始执行,父进程为 8551
子进程 8568 开始执行,父进程为 8551
子进程 8567 执行时间为 1.0012001991271973 秒
子进程 8568 执行时间为 2.0012259483337402 秒
---------- 父进程结束,执行时间为 2.0083987712860107
"""

使用进程池 Pool 创建进程

上面我们使用 Process 类创建了 2 个进程。如果要创建几十个或几百个进程,则需要实例化更多个 Process 类。有没有更好的创建进程的方式解决这类问题呢?答案就是使用 multiprocessing 模块提供的 Pool 类,即 Pool 进程池。

为了更好地理解进程池,可以将进程池比作水池,我们需要完成放满 10 个水盆的水的任务,而在这个水池中,最多可以安放 3 个水盆接水,也就是同时可以执行 3 个任务,即开始 3 个进程。为更快完成任务,现在打开 3 个水龙头开始放水,当有一个水盆的水接满时,即该进程完成一个任务,我们就将这个水盆的水倒入水桶中,然后继续接水,即执行下一个任务。如果 3 个水盆每次同时装满水,那么在放满第 9 盆水后,系统会随机分配 1 一个水盆接水,另外 2 个水盆空闲

Pool 类的常用方法

  • apply_async(func, args=(), kwds={}, callback=None, error_callback=None): 使用非阻塞方式调用 func 函数
  • apply(func, args=(), kwds={}): 使用阻塞方式调用 func 函数
  • close(): 关闭 Pool,使其不再接受新的任务
  • terminate(): 不管任务是否完成,立即终止
  • join(): 主进程阻塞,等待子进程的退出,必须在 close 或 terminate 之后使用
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from multiprocessing import Pool
def task(name):
    t_start = time.time()
    print(f"子进程 {os.getpid()} 执行 task {name} ...")
    time.sleep(1)
    print(f"task {name} 执行时间为 {time.time() - t_start} 秒")    
if __name__ == "__main__":
    t_start = time.time()
    print(f"父进程 {os.getpid()}")
    p = Pool(3)
    for i in range(10):
        p.apply_async(task, args=(i,))
    print("等待所有子进程结束...")
    p.close()
    p.join()
    print(f"所有子进程结束,执行时间为 {time.time() - t_start} 秒")   
"""
父进程 9194
等待所有子进程结束...
子进程 9211 执行 task 0 ...
子进程 9210 执行 task 1 ...
子进程 9212 执行 task 2 ...
task 1 执行时间为 1.00118088722229 秒
task 2 执行时间为 1.0010809898376465 秒
task 0 执行时间为 1.0013091564178467 秒
子进程 9212 执行 task 3 ...
子进程 9210 执行 task 5 ...
子进程 9211 执行 task 4 ...
task 4 执行时间为 1.0010030269622803 秒
task 5 执行时间为 1.0009760856628418 秒
task 3 执行时间为 1.0012133121490479 秒
子进程 9210 执行 task 6 ...
子进程 9212 执行 task 7 ...
子进程 9211 执行 task 8 ...
task 6 执行时间为 1.0012168884277344 秒
task 8 执行时间为 1.0010061264038086 秒
task 7 执行时间为 1.0011136531829834 秒
子进程 9211 执行 task 9 ...
task 9 执行时间为 1.0011179447174072 秒
所有子进程结束,执行时间为 4.08761191368103 秒
"""

通过队列实现进程间通信

多进程中每个进程都有自己的地址空间、内存、数据栈以及其他记录其运行状态的辅助数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
"""
检验进程是否共享信息
"""
from multiprocessing import Process
def plus():
    print("------ 子进程 1 开始 ------")
    global g_num
    g_num += 50
    print(f"g_num is {g_num}")
    print("------ 子进程 1 结束 ------")
def minus():
    print("------ 子进程 2 开始 ------")
    global g_num
    g_num -= 50
    print(f"g_num is {g_num}")
    print("------ 子进程 2 结束 ------")
g_num = 100
if __name__ == "__main__":
    print("------ 主进程开始 ------")
    print(f"g_num is {g_num}")
    p1 = Process(target=plus)
    p2 = Process(target=minus)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("------ 主进程结束 ------")
"""
------ 主进程开始 ------
g_num is 100
------ 子进程 1 开始 ------
g_num is 150
------ 子进程 1 结束 ------
------ 子进程 2 开始 ------
g_num is 50
------ 子进程 2 结束 ------
------ 主进程结束 ------
"""

代码中分别创建了 2 个子进程,一个子进程中令 g_num 加上 50,另一个进程令 g_num 减去 50.但是从运行结果可以看出,g_num 在父进程和 2 个子进程中的初始值都是 100.也就是全局变量 g_num 在一个进程中的结果,没有传递到下一个进程中,即进程之间没有共享信息。

要如何才能实现进程间的通信呢?Python 的 multiprocessing 模块包装了底层的机制,提供了 Queue(队列)、Pipes(管道)等多种方式来交换数据。

初始化 Queue() 对象时(例如: q = Queue(num)),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)

Queue 的常用方法:

  • Queue.size(): 返回当前队列包含的消息数量
  • Queue.empty(): 如果队列为空,返回 True; 反之返回 False
  • Queue.full(): 如果队列满了,返回 True; 反之返回 False
  • Queue.get(block=True, timeout=None): 获取队列中的一条消息,然后将其从列队中移除,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位秒),消息队列为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止。如果设置了 timeout,则会等待 timeout 秒,若还没读取到任何消息,则抛出“Queue.Empty”异常;如果 block 值为 False,消息队列为空,则会立刻抛出“Queue.Empty”异常
  • Queue.get_nowait(): 相当 Queue.get(False)
  • Queue.put(item, block=True, timeout=None): 将 item 消息写入队列,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout,消息队列如果已经没有空间可写入,此时程序将会阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了 timeout,则会等待 timeout 秒,若还没空间,则抛出“Queue.Full”异常;如果 block 值为 False,消息队列没有空间可写入,则会立刻抛出“Queue.Full”异常。
  • Queue.put_nowait(item): 相当 Queue.put(item, False)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process, Queue
def write_task(q):
    for i in range(5):
        message = f"消息{i}"
        q.put(message)
        print(f"写入:{message}")
def read_task(q):
    time.sleep(1)
    while not q.empty():
        print(f"读取:{q.get(True, 2)}")
if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write_task, args=(q,))
    pr = Process(target=read_task, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.join()
"""
写入:消息0
写入:消息1
写入:消息2
写入:消息3
写入:消息4
读取:消息0
读取:消息1
读取:消息2
读取:消息3
读取:消息4
"""