Skip to content

线程

使用 threading 模块创建线程

threading 模块提供了一个 Thread 类来代表一个线程对象

1
2
3
4
5
6
7
8
9
Thread(
    group=None,
    target=None, # 表示一个可调用对象,线程启动时,run() 方法将调用此对象,默认值为 None,表示不调用任何内容
    name=None, # 表示当前线程名称,默认创建一个“Thread-N”格式的唯一名称
    args=(),
    kwargs=None,
    *,
    daemon=None,
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
import threading
def test():
    for i in range(3):
        time.sleep(1)
        print(f"thread name is {threading.current_thread().name}")
if __name__ == "__main__":
    t_start = time.time()
    threads = [threading.Thread(target=test) for i in range(4)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(f"执行时间:{time.time() - t_start}")
"""
thread name is Thread-2
thread name is Thread-1
thread name is Thread-4
thread name is Thread-3
thread name is Thread-2
thread name is Thread-1
thread name is Thread-3
thread name is Thread-4
thread name is Thread-1
thread name is Thread-3
thread name is Thread-2
thread name is Thread-4
执行时间:3.0099618434906006
"""

使用 Thread 子类创建线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
import threading
class SubThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            print(f"子线程 {self.name} 执行,i = {i}")
if __name__ == "__main__":
    t_start = time.time()
    t1 = SubThread()
    t2 = SubThread()
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f"执行时间:{time.time() - t_start}")
"""
子线程 Thread-1 执行,i = 0
子线程 Thread-2 执行,i = 0
子线程 Thread-1 执行,i = 1
子线程 Thread-2 执行,i = 1
子线程 Thread-1 执行,i = 2
子线程 Thread-2 执行,i = 2
执行时间:3.0095698833465576
"""

线程间通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
from threading import Thread
def plus():
    print("------ 子线程 1 开始 ------")
    global g_num
    g_num += 50
    print(f"g_num is {g_num}")
    print("------ 子线程 1 结束 ------")
def minus():
    time.sleep(1)
    print("------ 子线程 2 开始 ------")
    global g_num
    g_num -= 50
    print(f"g_num is {g_num}")
    print("------ 子线程 2 结束 ------")
g_num = 100
if __name__ == "__main__":
    print(f"g_num is {g_num}")
    t1 = Thread(target=plus)
    t2 = Thread(target=minus)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
"""
g_num is 100
------ 子线程 1 开始 ------
g_num is 150
------ 子线程 1 结束 ------
------ 子线程 2 开始 ------
g_num is 100
------ 子线程 2 结束 ------
"""

从上面的例子可以得出,在一个进程内的所有线程共享全局变量,能够在不使用其他方式的前提下完成多线程之间的数据共享

什么是互斥锁

由于线程可以对全局变量随意修改,这就可能造成多线程之间对全局变量的混乱操作。以房子为例,当房子内只有一个居住者时(单线程),他可以任意时刻使用任意一个房间,如厨房、卧室和卫生间等。但是,当这个房子有多个居住者时(多线程),他就不能在任意时刻使用某些房间,如卫生间,否则就会造成混乱。

如何解决这个问题呢?一个防止他人进入的简单方法,就是门上加一把锁。先到的人锁上门,后到的人就在门口排队,等锁打开再进去

这就是“互斥锁(Mutual exclusion,缩写 Mutex)”,防止多个线程同时读写某一块内存区域。互斥锁为资源引入一个状态:锁定和非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改:直到该线程释放资源,将资源的状态变成“非锁定”时,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

使用互斥锁

在 threading 模块中使用 Lock 类可以方便地处理锁定。Lock 类有 2 个方法:acquire() 锁定和 release() 释放锁

1
2
3
mutex = threading.Lock() # 创建锁
mutex.acquire() # 锁定
mutex.release() # 释放锁
  • acquire(blocking=True, timeout=-1) -> bool: 获取锁定,如果有必要,需要阻塞到锁定释放为止。如果提供 blocking 参数并将它设置为 False,当无法获取锁定时将立即返回 False,如果成功获取锁定则返回 True
  • release(): 释放一个锁定。当锁定处于未锁定状态时,或者从原本调用 acquire() 方法的不同线程调用此方法,将出现错误

使用多线程和互斥锁模拟实现多人同时订购电影票的功能,假设电影院某个场次只有 100 张电影票,10 个用户同时抢购该电影票。每售出一张,显示一次剩余的电影票张数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
from threading import Thread, Lock
n = 100
def task():
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    print(f"购买成功,剩余 {n} 张电影票")
    mutex.release()
if __name__ == "__main__":
    mutex = Lock()
    t_l = []
    for i in range(10):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
"""
购买成功,剩余 99 张电影票
购买成功,剩余 98 张电影票
购买成功,剩余 97 张电影票
购买成功,剩余 96 张电影票
购买成功,剩余 95 张电影票
购买成功,剩余 94 张电影票
购买成功,剩余 93 张电影票
购买成功,剩余 92 张电影票
购买成功,剩余 91 张电影票
购买成功,剩余 90 张电影票
"""

注意:使用互斥锁时,要避免死锁

使用队列在线程间通信

multiprocessing 模块的 Queue 队列可以实现进程间通信,同样在线程间,也可以使用 Queue 队列实现线程间通信。不同之处在于我们需要使用 queue 模块的 Queue 队列,而不是 multiprocessing 模块的 Queue 队列,但 Queue 的使用方法相同

使用 Queue 在线程间通信通常应用于生产者消费者模式。产生数据的模式称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库取出商品,这就构成了生产者消费者模式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import time
import random
import threading
from queue import Queue
class Producer(threading.Thread):
    def __init__(self, name, queue):
        super().__init__(name=name)
        self.data = queue
    def run(self):
        for i in range(5):
            print(f"生产者 {self.getName()} 将产品 {i} 加入队列!")
            self.data.put(i)
            time.sleep(random.random())
        print(f"生产者 {self.getName()} 完成!")
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        super().__init__(name=name)
        self.data = queue
    def run(self):
        for i in range(5):
            val = self.data.get()
            print(f"消费者 {self.getName()} 将产品 {val} 从队列中取出!")
            time.sleep(random.random())
        print(f"消费者 {self.getName()} 完成!")
if __name__ == "__main__":
    que = Queue()
    producer = Producer("Producer", que)
    consumer = Consumer("Consumer", que)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
"""
生产者 Producer 将产品 0 加入队列!
消费者 Consumer 将产品 0 从队列中取出!
生产者 Producer 将产品 1 加入队列!
生产者 Producer 将产品 2 加入队列!
消费者 Consumer 将产品 1 从队列中取出!
生产者 Producer 将产品 3 加入队列!
消费者 Consumer 将产品 2 从队列中取出!
生产者 Producer 将产品 4 加入队列!
消费者 Consumer 将产品 3 从队列中取出!
消费者 Consumer 将产品 4 从队列中取出!
生产者 Producer 完成!
消费者 Consumer 完成!
"""

进程和线程的区别

  • 进程是系统进行资源分配和调度的一个独立单位,线程是进程的一个实体,是 CPU 调度和分派的基本单位
  • 进程之间是相互独立的,多进程中,同一个变量,各有一份备份存在于每个进程中,但互不影响;而同一个进程的多个线程是内存共享的,所有变量都由所有线程共享
  • 由于进程间是独立的,因此一个进程的崩溃不会影响到其他进程;而线程是包含在进程之内的,线程的崩溃就会引起进程的崩溃,继而导致同一进程内的其他线程也崩溃

sys.exit() 和 os._eixt()

sys.exit()的退出比较优雅,调用后会引发SystemExit异常,可以捕获此异常做清理工作。
os._exit()直接将python解释器退出,余下的语句不会执行。

sys.exit()只退出当前线程。

os._exit(0):无错误退出
os._exit(1):有错误退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from datetime import datetime
import sys


async def task1():
    await asyncio.sleep(5)
    print(f"{datetime.now()} finish task1!")
    sys.exit()


async def task2():
    print('begin task2.')
    await asyncio.sleep(10)
    print(f"{datetime.now()} finish task2!")


async def run():
    loop.create_task(task2())
    await task1()


print(f"{datetime.now()} begin")
loop = asyncio.get_event_loop()
loop.run_until_complete(run())

# 2020-10-27 11:19:48.327924 begin
# begin task2.
# 2020-10-27 11:19:53.329162 finish task1!
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
import threading
import sys


def task1():
    for i in range(3):
        time.sleep(1)
        print(f"thread1 name is {threading.current_thread().name}")
    sys.exit()


def task2():
    for i in range(5):
        time.sleep(1)
        print(f"thread2 name is {threading.current_thread().name}")


t_start = time.time()
threads = [threading.Thread(target=task1), threading.Thread(target=task2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"执行时间:{time.time() - t_start}")

# thread1 name is Thread-1
# thread2 name is Thread-2
# thread2 name is Thread-2
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread2 name is Thread-2
# 执行时间:5.018771171569824
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
import threading
import os


def task1():
    for i in range(3):
        time.sleep(1)
        print(f"thread1 name is {threading.current_thread().name}")
    os._exit(0)


def task2():
    for i in range(5):
        time.sleep(1)
        print(f"thread2 name is {threading.current_thread().name}")


t_start = time.time()
threads = [threading.Thread(target=task1), threading.Thread(target=task2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"执行时间:{time.time() - t_start}")

# thread1 name is Thread-1
# thread2 name is Thread-2
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread1 name is Thread-1