本文最后更新于:2022年5月4日 晚上
前言
由于GIL的限制,python的多线程实际上只用到了cpu的单核。在计算密集型的程序中,python的多线程对提升效率的影响并不大。但在I/O密集型的程序中,python的多线程便能极大的提高运行效率
一个简单的多线程程序
| import threading
def output(value): for i in range(5): print(value)
if __name__ == "__main__": t_list = [] for i in range(3): t = threading.Thread(target=output, args=[i]) t.start() t_list.append(t) for t in t_list: t.join()
|
这段代码主要分为两个部分,一部分是函数的定义,另一部分是多线程的启动。
首先创建了三个线程,将线程加入到数组并启动。使用join方法的目的是保证所有线程都执行完毕后才会接着去执行join以下的代码。或是让主线程等待所有子线程执行结束后再结束
由于pytho中的多线程是单核处理的,在执行多线程任务的时候,会在线程间进行切换。这里从不规则的输出结果中也可以直观的看出。在I/O密集的程序中,如爬虫,在遇到requests等待的时候,python会切换到其他的线程进行执行,提高了整体的工作效率。
多线程冲突
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| num = 0 def add(): global num for i in range(1000000): num += 1
if __name__ == "__main__": t_list = [] for i in range(10): t = threading.Thread(target=add) t.start() t_list.append(t) for t in t_list: t.join()
print(num)
|
在使用多线程的时候,难免需要操作共同的变量。在上面的例子中,如果只是简单直接的操作,就会造成线程冲突的情况。
这是因为在执行 num +=1 的时候,实际上执行了三个步骤
- 拿到num的值
- 给拿到的num加1
- 将处理后的值赋值给num
上面提到过,python会在多线程直接进行切换。如果在执行完第一步以后,切换到了其他线程B.执行完线程B后再返回执行2和3,就会导致赋予的值比实际的值更小。为了避免这种情况,就需要用到锁
四种锁的基本使用
互斥锁Lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| lock = threading.Lock() num = 0 def add(): global num lock.acquire() for i in range(1000000): num += 1 lock.release()
if __name__ == "__main__": t_list = [] for i in range(10): t = threading.Thread(target=add) t.start() t_list.append(t) for t in t_list: t.join() print(num) # Output # 10000000
|
互斥锁是比较常见的一种锁。首先我们需要创建一个锁的对象 lock.
在要执行可能冲突的指令前加上锁 lock.acquire().此时如果其他的线程要访问时,就会阻塞在这里。知道这里的锁被释放。这样就保证了数据的正确性
递归锁Rlock
在程序的执行过程中,可能会遇到这种情况
线程A锁了资源a, 线程B锁了资源b
此时A要访问资源b, B要访问资源a. 但因为资源都被加了锁的原因,导致它们谁都访问不了,这样就产生了死锁。
1 2 3 4 5 6 7 8 9 10
| lock = threading.RLock() num = 0 def add(): global num lock.acquire() for i in range(1000000): lock.acquire() num += 1 lock.release() lock.release()
|
递归锁内部维持着一个counter,如果counter的值为0,则其他线程可以进行加锁,并且counter的值加1.
如果counter的值不为0,递归锁会对需要加锁的线程进行判断,如果该线程与当前递归锁的线程一致,则允许再次上锁并且counter+1. 当对递归锁进行释放,counter便会减1,知道变为0才允许其他的线程访问。这样就避免了出现死锁的情况.
条件锁Condition
条件锁主要新增了两个方法, notify和wait。 当当前状态为wait时,程序会阻塞在这里,知道其他进程执行了notify进行通知,才会继续执行下面的步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import threading import time import random
con = threading.Condition() pot = [] def pro(): # 生产者 print("pro start") con.acquire() print("pro acquire") while True: time.sleep(random.randint(1, 3)) pot.append("鱼丸") print(f"生产了一个鱼丸,现在锅里有{len(pot)}个鱼丸") if len(pot) >= 5: con.notify() con.wait()
def eat(): # 消费者 print("eat start") con.acquire() print("eat acquire") while True: time.sleep(random.randint(1, 3)) pot.pop() print(f"吃了一个鱼丸,现在锅里还有{len(pot)}个鱼丸") if len(pot) == 0: con.notify() con.wait()
pro = threading.Thread(target=pro) eat = threading.Thread(target=eat) pro.start() eat.start()
|
这里用常见的生产者消费者的关系来演示。生产者的线程首先被启动并且拿到锁,于此同时消费者也被启动,但是没有拿到锁 进入阻塞状态。生产者开始生产鱼丸,当鱼丸的个数大于等于5时,便会通知其他在阻塞状态的线程,并且自己通过wait进入阻塞。
阻塞状态的消费者在接受到生产者的notify消息时,便开始消费锅内的鱼丸。直到锅内没有鱼丸的时候,用notify通知其他阻塞状态中的线程,并且自己进入wait状态。一直循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| pro start pro acquire eat start 生产了一个鱼丸,现在锅里有1个鱼丸 生产了一个鱼丸,现在锅里有2个鱼丸 生产了一个鱼丸,现在锅里有3个鱼丸 生产了一个鱼丸,现在锅里有4个鱼丸 生产了一个鱼丸,现在锅里有5个鱼丸 eat acquire 吃了一个鱼丸,现在锅里还有4个鱼丸 吃了一个鱼丸,现在锅里还有3个鱼丸 吃了一个鱼丸,现在锅里还有2个鱼丸 吃了一个鱼丸,现在锅里还有1个鱼丸 吃了一个鱼丸,现在锅里还有0个鱼丸 生产了一个鱼丸,现在锅里有1个鱼丸 生产了一个鱼丸,现在锅里有2个鱼丸 生产了一个鱼丸,现在锅里有3个鱼丸 生产了一个鱼丸,现在锅里有4个鱼丸 生产了一个鱼丸,现在锅里有5个鱼丸 吃了一个鱼丸,现在锅里还有4个鱼丸 吃了一个鱼丸,现在锅里还有3个鱼丸 吃了一个鱼丸,现在锅里还有2个鱼丸
|
信号量semaphore
多线程能够提升I/O密集程序的效率,但并不意味着线程越多越好,因为多线程在切换的时候也会消耗一定的系统资源。此时我们就可以通过信号量对锁的数量进行限制,保证一定的效率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import threading import time
sema = threading.Semaphore(3) def func(): sema.acquire() print(time.time()) time.sleep(3) sema.release()
t_list = [] for i in range(10): t = threading.Thread(target=func) t_list.append(t) t.start()
for t in t_list: t.join() # Output #1622450593.179221 #1622450593.179297 #1622450593.179469 #1622450596.179563 #1622450596.179789 #1622450596.179818 #1622450599.179777 #1622450599.179847 #1622450599.180023 #1622450602.18302
|
这里我们设置了semaphore的数量为3,启动了10个线程。通过输出我们可以看到实际上每次只有3个线程拿到了锁。