线程
使用 threading 模块创建线程
threading 模块提供了一个 Thread 类来代表一个线程对象
| Thread(
group=None,
target=None, # 表示一个可调用对象,线程启动时,run() 方法将调用此对象,默认值为 None,表示不调用任何内容
name=None, # 表示当前线程名称,默认创建一个“Thread-N”格式的唯一名称
args=(),
kwargs=None,
*,
daemon=None,
)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | import time
import threading
def test():
for i in range(3):
time.sleep(1)
print(f"thread name is {threading.current_thread().name}")
if __name__ == "__main__":
t_start = time.time()
threads = [threading.Thread(target=test) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"执行时间:{time.time() - t_start}")
"""
thread name is Thread-2
thread name is Thread-1
thread name is Thread-4
thread name is Thread-3
thread name is Thread-2
thread name is Thread-1
thread name is Thread-3
thread name is Thread-4
thread name is Thread-1
thread name is Thread-3
thread name is Thread-2
thread name is Thread-4
执行时间:3.0099618434906006
"""
|
使用 Thread 子类创建线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | import time
import threading
class SubThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
print(f"子线程 {self.name} 执行,i = {i}")
if __name__ == "__main__":
t_start = time.time()
t1 = SubThread()
t2 = SubThread()
t1.start()
t2.start()
t1.join()
t2.join()
print(f"执行时间:{time.time() - t_start}")
"""
子线程 Thread-1 执行,i = 0
子线程 Thread-2 执行,i = 0
子线程 Thread-1 执行,i = 1
子线程 Thread-2 执行,i = 1
子线程 Thread-1 执行,i = 2
子线程 Thread-2 执行,i = 2
执行时间:3.0095698833465576
"""
|
线程间通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | import time
from threading import Thread
def plus():
print("------ 子线程 1 开始 ------")
global g_num
g_num += 50
print(f"g_num is {g_num}")
print("------ 子线程 1 结束 ------")
def minus():
time.sleep(1)
print("------ 子线程 2 开始 ------")
global g_num
g_num -= 50
print(f"g_num is {g_num}")
print("------ 子线程 2 结束 ------")
g_num = 100
if __name__ == "__main__":
print(f"g_num is {g_num}")
t1 = Thread(target=plus)
t2 = Thread(target=minus)
t1.start()
t2.start()
t1.join()
t2.join()
"""
g_num is 100
------ 子线程 1 开始 ------
g_num is 150
------ 子线程 1 结束 ------
------ 子线程 2 开始 ------
g_num is 100
------ 子线程 2 结束 ------
"""
|
从上面的例子可以得出,在一个进程内的所有线程共享全局变量,能够在不使用其他方式的前提下完成多线程之间的数据共享
什么是互斥锁
由于线程可以对全局变量随意修改,这就可能造成多线程之间对全局变量的混乱操作。以房子为例,当房子内只有一个居住者时(单线程),他可以任意时刻使用任意一个房间,如厨房、卧室和卫生间等。但是,当这个房子有多个居住者时(多线程),他就不能在任意时刻使用某些房间,如卫生间,否则就会造成混乱。
如何解决这个问题呢?一个防止他人进入的简单方法,就是门上加一把锁。先到的人锁上门,后到的人就在门口排队,等锁打开再进去
这就是“互斥锁(Mutual exclusion,缩写 Mutex)”,防止多个线程同时读写某一块内存区域。互斥锁为资源引入一个状态:锁定和非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改:直到该线程释放资源,将资源的状态变成“非锁定”时,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
使用互斥锁
在 threading 模块中使用 Lock 类可以方便地处理锁定。Lock 类有 2 个方法:acquire() 锁定和 release() 释放锁
| mutex = threading.Lock() # 创建锁
mutex.acquire() # 锁定
mutex.release() # 释放锁
|
- acquire(blocking=True, timeout=-1) -> bool: 获取锁定,如果有必要,需要阻塞到锁定释放为止。如果提供 blocking 参数并将它设置为 False,当无法获取锁定时将立即返回 False,如果成功获取锁定则返回 True
- release(): 释放一个锁定。当锁定处于未锁定状态时,或者从原本调用 acquire() 方法的不同线程调用此方法,将出现错误
使用多线程和互斥锁模拟实现多人同时订购电影票的功能,假设电影院某个场次只有 100 张电影票,10 个用户同时抢购该电影票。每售出一张,显示一次剩余的电影票张数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | import time
from threading import Thread, Lock
n = 100
def task():
global n
mutex.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
print(f"购买成功,剩余 {n} 张电影票")
mutex.release()
if __name__ == "__main__":
mutex = Lock()
t_l = []
for i in range(10):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
"""
购买成功,剩余 99 张电影票
购买成功,剩余 98 张电影票
购买成功,剩余 97 张电影票
购买成功,剩余 96 张电影票
购买成功,剩余 95 张电影票
购买成功,剩余 94 张电影票
购买成功,剩余 93 张电影票
购买成功,剩余 92 张电影票
购买成功,剩余 91 张电影票
购买成功,剩余 90 张电影票
"""
|
注意:使用互斥锁时,要避免死锁
使用队列在线程间通信
multiprocessing 模块的 Queue 队列可以实现进程间通信,同样在线程间,也可以使用 Queue 队列实现线程间通信。不同之处在于我们需要使用 queue 模块的 Queue 队列,而不是 multiprocessing 模块的 Queue 队列,但 Queue 的使用方法相同
使用 Queue 在线程间通信通常应用于生产者消费者模式。产生数据的模式称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库取出商品,这就构成了生产者消费者模式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 | import time
import random
import threading
from queue import Queue
class Producer(threading.Thread):
def __init__(self, name, queue):
super().__init__(name=name)
self.data = queue
def run(self):
for i in range(5):
print(f"生产者 {self.getName()} 将产品 {i} 加入队列!")
self.data.put(i)
time.sleep(random.random())
print(f"生产者 {self.getName()} 完成!")
class Consumer(threading.Thread):
def __init__(self, name, queue):
super().__init__(name=name)
self.data = queue
def run(self):
for i in range(5):
val = self.data.get()
print(f"消费者 {self.getName()} 将产品 {val} 从队列中取出!")
time.sleep(random.random())
print(f"消费者 {self.getName()} 完成!")
if __name__ == "__main__":
que = Queue()
producer = Producer("Producer", que)
consumer = Consumer("Consumer", que)
producer.start()
consumer.start()
producer.join()
consumer.join()
"""
生产者 Producer 将产品 0 加入队列!
消费者 Consumer 将产品 0 从队列中取出!
生产者 Producer 将产品 1 加入队列!
生产者 Producer 将产品 2 加入队列!
消费者 Consumer 将产品 1 从队列中取出!
生产者 Producer 将产品 3 加入队列!
消费者 Consumer 将产品 2 从队列中取出!
生产者 Producer 将产品 4 加入队列!
消费者 Consumer 将产品 3 从队列中取出!
消费者 Consumer 将产品 4 从队列中取出!
生产者 Producer 完成!
消费者 Consumer 完成!
"""
|
进程和线程的区别
- 进程是系统进行资源分配和调度的一个独立单位,线程是进程的一个实体,是 CPU 调度和分派的基本单位
- 进程之间是相互独立的,多进程中,同一个变量,各有一份备份存在于每个进程中,但互不影响;而同一个进程的多个线程是内存共享的,所有变量都由所有线程共享
- 由于进程间是独立的,因此一个进程的崩溃不会影响到其他进程;而线程是包含在进程之内的,线程的崩溃就会引起进程的崩溃,继而导致同一进程内的其他线程也崩溃
sys.exit() 和 os._eixt()
sys.exit()
的退出比较优雅,调用后会引发SystemExit异常,可以捕获此异常做清理工作。
os._exit()
直接将python解释器退出,余下的语句不会执行。
sys.exit()
只退出当前线程。
os._exit(0)
:无错误退出
os._exit(1)
:有错误退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | import asyncio
from datetime import datetime
import sys
async def task1():
await asyncio.sleep(5)
print(f"{datetime.now()} finish task1!")
sys.exit()
async def task2():
print('begin task2.')
await asyncio.sleep(10)
print(f"{datetime.now()} finish task2!")
async def run():
loop.create_task(task2())
await task1()
print(f"{datetime.now()} begin")
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
# 2020-10-27 11:19:48.327924 begin
# begin task2.
# 2020-10-27 11:19:53.329162 finish task1!
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | import time
import threading
import sys
def task1():
for i in range(3):
time.sleep(1)
print(f"thread1 name is {threading.current_thread().name}")
sys.exit()
def task2():
for i in range(5):
time.sleep(1)
print(f"thread2 name is {threading.current_thread().name}")
t_start = time.time()
threads = [threading.Thread(target=task1), threading.Thread(target=task2)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"执行时间:{time.time() - t_start}")
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread2 name is Thread-2
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread2 name is Thread-2
# 执行时间:5.018771171569824
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | import time
import threading
import os
def task1():
for i in range(3):
time.sleep(1)
print(f"thread1 name is {threading.current_thread().name}")
os._exit(0)
def task2():
for i in range(5):
time.sleep(1)
print(f"thread2 name is {threading.current_thread().name}")
t_start = time.time()
threads = [threading.Thread(target=task1), threading.Thread(target=task2)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"执行时间:{time.time() - t_start}")
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread1 name is Thread-1
# thread2 name is Thread-2
# thread1 name is Thread-1
|