在此之前请完整阅读完
GIL 全局解释器锁
GIL(全局解释器锁,GIL 只有cpython有):在同一个时刻,只能有一个线程在一个cpu上执行字节码,没法像c和Java一样将多个线程映射到多个CPU上执行,但是GIL会根据执行的字节码行数(为了让各个线程能够平均利用CPU时间,python会计算当前已执行的微代码数量,达到一定阈值后就强制释放GIL)和时间片以及遇到IO操作的时候主动释放锁,让其他字节码执行。
作用:限制多线程同时执行,保证同一个时刻只有一个线程执行。
原因:线程并非独立,在一个进程中多个线程共享变量的,多个线程执行会导致数据被污染造成数据混乱,这就是线程的不安全性,为此引入了互斥锁。
互斥锁:即确保某段关键代码的数据只能又一个线程从头到尾完整执行,保证了这段代码数据的安全性,但是这样就会导致死锁。
死锁:多个子线程在等待对方解除占用状态,但是都不先解锁,互相等待,这就是死锁。
基于GIL的存在,在遇到大量的IO操作(文件读写,网络等待)代码时,使用多线程效率更高。
多线程
一个CPU再同一个时刻只能执行一个线程,但是当遇到IO操作或者运行一定的代码量的时候就会释放全局解释器锁,执行另外一个线程。
就好像你要烧水和拖地,这是两个任务,如果是单线程来处理这两个任务的话,先烧水,等水烧开,再拖地。这样等待水烧开的时间就白白浪费了,倘若事交给多线程来做的话,就先烧水,烧水的过程中(相当于IO操作的时候)把时间资源让出来给拖地,拖完地后水也烧好了,这个就是多线程的优势,再同一个时间段做更多的事情,也就是再以后会降到的高并发。
它提供如下一些方法:
t1 = threading.Thread(target=你写的函数名,args=(传入变量(如果只有一个变量就必须在后加上逗号),),name=随便取一个线程名):把一个线程实例化给t1,这个线程负责执行target=你写的函数名
t1.start():负责执行启动这个线程
t1.join():必须要等待你的子线程执行完成后再执行主线程
t1.setDeamon(True):当你的主线程执行完毕后,不管子线程有没有执行完成都退出主程序,注意不能和t1.join()一起使用。
threading.current_thread().name:打印出线程名
1
2
3
4
5
这些方法一开始看可能会觉得有些多,不过不打紧,可以先把后面的代码看完在回过头看这些提供的方法就觉得很简单了。
单线程版本
import time
def mop_floor():
print(‘我要拖地了’)
time.sleep(1)
print(‘地拖完了’)
def heat_up_watrt():
print(‘我要烧水了’)
time.sleep(6)
print(‘水烧开了’)
start_time = time.time()
heat_up_watrt()
mop_floor()
end_time = time.time()
print(‘总共耗时:{}’.format(end_time-start_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
返回结果:
我要烧水了
水烧开了
我要拖地了
地拖完了
总共耗时:7.000766277313232
1
2
3
4
5
单线程一共耗时7秒
多线程版本
import threading
import time
def mop_floor():
print(‘我要拖地了’)
time.sleep(1)
print(‘地拖完了’)
def heat_up_watrt():
print(‘我要烧水了’)
time.sleep(6)
print(‘水烧开了’)
start_time = time.time()
t1 = threading.Thread(target=heat_up_watrt)
t2 = threading.Thread(target=mop_floor)
t1.start()
t2.start()
t1.join()
t2.join()
end_time = time.time()
print(‘总共耗时:{}’.format(end_time-start_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
返回结果:
我要烧水了
我要拖地了
地拖完了
水烧开了
总共耗时:6.000690460205078
1
2
3
4
5
可以看到烧水等待的时候直接执行拖地任务,并且总共耗时为6秒,关于这里的start和jion都是固定的操作套路,记住这两个代词以后直接套用即可,需要注意的是多线程程序的执行顺序是不确定的。当执行到sleep语句时,线程将被阻塞(Blocked),到sleep结束后,线程进入就绪(Runnable)状态,等待调度的命令执行另一个子线程,线程调度将自行选择一个线程执行。
类方法实现多线程
当遇到比较复杂的业务逻辑或者想要再子线程上面加一些方法的话,可以使用自己的类重写代码,还是使用刚刚的例子:
import threading
import time
class mop_floor(threading.Thread):
def init(self):
super().init()
def run(self):
print('我要拖地了')
time.sleep(1)
print('地拖完了')
class heat_up_watrt(threading.Thread):
def init(self,name):
# 这里传入参数name,就是传入子线程名字
super().init(name=name)
# 记住这里的格式不能错
def run(self):
print('我要烧水了')
print(self.name)
print(threading.current_thread().name)
# 这两个都是打印出当前子线程的名字
time.sleep(6)
print('水烧开了')
start_time = time.time()
t1 = mop_floor()
t2 = heat_up_watrt(‘我是烧水员‘)
t1.start()
t2.start()
t1.join()
t2.join()
end_time = time.time()
print(‘总共耗时:{}’.format(end_time-start_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
返回结果:
我要拖地了
我要烧水了
我是烧水员
我是烧水员
地拖完了
水烧开了
总共耗时:6.000684022903442
1
2
3
4
5
6
7
python的threading.Thread类有一个run方法,用于定义线程的功能函数,可以在自己的线程类中覆盖该方法。而创建自己的线程实例后,通过Thread类的start方法,可以启动该线程,交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。
当然你也可以在类方法中加上使用线程锁
class test(threading.Thread):
def init(self,lock):
super().init()()
self.lock = lock
def run(self):
self.lock.acquire()
pass
self.lock.release()
if __name__ == '__main__':
lock = threading.Lock()
t = test(lock)
1
2
3
4
5
6
7
8
9
10
11
12
13
run方法与start方法的区别
其实不仅仅是start,使用run也可以让子线程执行:
def mop_floor():
print(‘我要拖地了’)
time.sleep(1)
print(‘地拖完了’)
def heat_up_watrt():
print(‘我要烧水了’)
time.sleep(6)
print(‘水烧开了’)
start_time = time.time()
t1 = threading.Thread(target=heat_up_watrt)
t2 = threading.Thread(target=mop_floor)
t1.run()
t2.run()
注意这里不能加上join()
end_time = time.time()
print(‘总共耗时:{}’.format(end_time – start_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
返回结果:
我要烧水了
水烧开了
我要拖地了
地拖完了
总共耗时:7.000263929367065
1
2
3
4
5
两个子线程都用run()方法启动,但却是先运行t1.run(),运行完之后才按顺序运行t2.run(),两个线程都工作在主线程,没有启动新线程,因此,run()方法仅仅是普通函数调用。
start() 方法是启动一个子线程,线程名就是我们定义的name
run() 方法并不启动一个新线程,就是在主线程中调用了一个普通函数而已。
1
2
因此,如果你想启动多线程,就必须使用start()方法。
烧水和拖地只是两个比喻,运用到实际业务中就好比爬虫中一个线程去爬标题与网址另一个去爬标题内的内容,也好比一个线程去获取网页内容,另一个线程去把网页内容写入本地,网络编程中也是,你在等待tcp链接的时候可以去做另外的事情等等。
主线程与子线程
回到刚刚烧水与拖地的例子中:
主线程:一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程(从一开始的代码执行到最后的打印出执行时间为主线程)
子线程:一个进程至少有一个主线程,其它线程称为子线程(拖地与烧水两个子线程)
上面的例子中可以发现一共有三个线程,一个主线程和两个子线程,如何定义子线程的?其实在代码中就发现了,使用t1 = threading.Thread(target=heat_up_watrt)即可生成一个子线程,然后使用t1.start()即可启动这个子线程,这样的话t1.jion()是不是就多余呢?其实不然,使用t1.jion()的作用就是:等待子线程执行完毕后再执行主线程,如果不加上t1.jion()的话,子线程任然执行,但是子线程再等待的时候(io操作的时候),释放出资源,这个时候主线程拿到资源运行主线程的任务,就会直接打印出共耗时:xxx,然后再等待子线程运行结束,最后退出主程序。
注意这里的t1.jion()放到位置很重要:
t1.start()
t1.join()
t2.start()
t2.join()
1
2
3
4
如果这样放的话,就是先执行线程1然后等待线程1执行完毕,然后执行线程2等待线程2执行完毕。
t1.start()
t2.start()
t1.join()
t2.join()
1
2
3
4
这样的话就不一样,会等到线程1与线程2执行完毕后再执行主线程。
上面说到主线程推出后要等待子线程执行完毕后才会退出整个主程序,此时使用t1.setDaemon(True)的话,会当主线程执行完毕后,t1子线程不管有没有执行完毕,都退出主程序。
线程间通信
在一个进程中,不同子线程负责不同的任务,t1子线程负责获取到数据,t2子线程负责把数据保存的本地,那么他们之间的通信使用Queue来完成。因为再一个进程中,数据变量是共享的,即多个子线程可以对同一个全局变量进行操作修改,Queue是加了锁的安全消息队列。
在此之前回顾Queue消息队列的使用Queue消息队列
注意queue.join()阻塞等待队列中任务全部处理完毕,需要配合queue.task_done使用
import threading
import time
import queue
q = queue.Queue(maxsize=5)
def t1(q):
while 1:
for i in range(10):
q.put(i)
def t2(q):
while not q.empty():
print(‘队列中的数据量:’+str(q.qsize()))
# q.qsize()是获取队列中剩余的数量
print(‘取出值:’+str(q.get()))
# q.get()是一个堵塞的,会等待直到获取到数据
print(‘—–‘)
time.sleep(0.1)
t1 = threading.Thread(target=t1,args=(q,))
t2 = threading.Thread(target=t2,args=(q,))
t1.start()
t2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
返回结果:
队列中的数据量:5
取出值:0
队列中的数据量:5
取出值:1
队列中的数据量:5
取出值:2
队列中的数据量:5
取出值:3
队列中的数据量:5
取出值:4
队列中的数据量:5
取出值:5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这个即使消息队列的简单使用,举个例子就能清晰的明白线程间使用queue如何通信:
import threading
import time
import queue
”’
模拟包子店卖包子
厨房每一秒钟制造一个包子
顾客每三秒吃掉一个包子
厨房一次性最多存放100个包子
”’
q = queue.Queue(maxsize=100)
厨房一次性最多存放100个包子
def produce(q):
这个函数专门产生包子
for i in range(10000):
q.put('第{}个包子'.format(str(i)))
# 生产出包子,表明包子的id号
time.sleep(1)
# 要一秒才能造出一个包子
def consume(q):
while not q.empty():
# 只要包子店里有包子
print(‘包子店的包子剩余量:’+str(q.qsize()))
# q.qsize()是获取队列中剩余的数量
print(‘小桃红吃了:’+str(q.get()))
# q.get()是一个堵塞的,会等待直到获取到数据
print(‘————‘)
time.sleep(3)
t1 = threading.Thread(target=produce,args=(q,))
t2 = threading.Thread(target=consume,args=(q,))
t1.start()
t2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
返回结果:
包子店的包子剩余量:1
小桃红吃了:第0个包子
包子店的包子剩余量:2
小桃红吃了:第1个包子
包子店的包子剩余量:4
小桃红吃了:第2个包子
包子店的包子剩余量:6
小桃红吃了:第3个包子
包子店的包子剩余量:8
小桃红吃了:第4个包子
包子店的包子剩余量:10
小桃红吃了:第5个包子
……
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
线程同步
如果没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。
同步就是协同步调,按预定的先后次序进行运行。
如:你说完,我再说。
“同”字从字面上容易理解为一起动作
其实不是,”同”字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。
线程锁实现同步控制
线程锁使用threading.Lock()实例化,使用acquire()上锁,使用release()释放锁,牢记acquire与release()必须要同时成对存在。它提供一些如下方法:
acquire():上锁,这个时候只能运行上锁后的代码
release():解锁,解锁后把资源让出来,给其他线程使用
1
2
举个例子:
def run1():
while 1:
print(‘我是老大,我先运行’)
def run2():
while 1:
print(‘我是老二,我第二运行’)
def run3():
while 1:
print(‘我是老三,我最后运行’)
t1 = threading.Thread(target=run1)
t2 = threading.Thread(target=run2)
t3 = threading.Thread(target=run3)
t1.start()
t2.start()
t3.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
这样运行的结果是无序没法控制的,但是当你加上一把锁后,就不一样了。
def run1():
while 1:
if l1.acquire():
# 如果第一把锁上锁了
print(‘我是老大,我先运行’)
l2.release()
# 释放第二把锁
def run2():
while 1:
if l2.acquire():
# 如果第二把锁上锁了
print(‘我是老二,我第二运行’)
l3.release()
# 释放第三把锁
def run3():
while 1:
if l3.acquire():
# 如果第三把锁上锁了
print(‘我是老三,我最后运行’)
l1.release()
# 释放第一把锁
t1 = threading.Thread(target=run1)
t2 = threading.Thread(target=run2)
t3 = threading.Thread(target=run3)
l1 = threading.Lock()
l2 = threading.Lock()
l3 = threading.Lock()
实例化三把锁
l2.acquire()
l3.acquire()
t1.start()
t2.start()
t3.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
返回结果:
我是老大,我先运行
我是老二,我第二运行
我是老三,我最后运行
我是老大,我先运行
我是老二,我第二运行
我是老三,我最后运行
我是老大,我先运行
我是老二,我第二运行
我是老三,我最后运行
…..
1
2
3
4
5
6
7
8
9
10
此时虽然按照自己的要求同步执行,但是运行速度会慢一点,因为上锁与释放锁需要时间会影响性能。
lock还有Rlock的方法,RLock允许在同一线程中被多次acquire(比如你一个函数上了锁,这个函数调用另一个函数,另一个函数也上了锁 )。而Lock却不允许这种情况。否则会出现死循环,程序不知道解哪一把锁。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁
条件变量实现同步精准控制
条件变量,用于复杂的线程间同步。在一些对线程间通信要求比较精准的需求下,使用简单的lock加锁解锁已经没法实现需求,这个时候condition条件控制就派上用场了。
condition源代码中本质上还是调用lock实现条件变量的控制, 他提供如下一些方法:
acquire():上锁
release():解锁
wait(timeout=None):堵塞线程,知道接受到一个notify或者超时才能继续运行,需记住wait()必须在已经获得lock的前提下才能调用
notify(n=1):打通线程(个人觉得这样叫方便理解),堵塞的线程接收到notify后开始运行,需记住notify()必须在已经获得lock的前提下才能调用
notifyAll():如果调用wait()堵塞的线程比较多,就打通所有的堵塞线程
1
2
3
4
5
通过实例代码来弄清楚他们怎么调用:
import threading
import random
import queue
q = queue.Queue(maxsize=100)
def produce(q):
while 1:
result = str(random.randint(1,100))
q.put(result)
print(‘我生成了一个随机数字:’+result)
def consume(q):
while 1:
res = q.get()
print(‘我获取到了你生成的随机数字:’+str(res))
t1 = threading.Thread(target=produce,args=(q,))
t2 = threading.Thread(target=consume,args=(q,))
t1.start()
t2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
返回结果:
我生成了一个随机数字:29
我获取到了你生成的随机数字:92
我生成了一个随机数字:38
我获取到了你生成的随机数字:9
我生成了一个随机数字:21
我获取到了你生成的随机数字:37
我生成了一个随机数字:28
1
2
3
4
5
6
7
由于线程的不安全性,每次生成和获取的数字都并非同时是按顺序索取要得到的,这个时候condition就派上用场了(其实如果设置消息队列的q=queue.Queue(size=1)就能解决这个问题)。
import threading
import random
def produce():
global q
while 1:
con.acquire()
# 必须在有锁的前提下才能使用条件变量
q = str(random.randint(1,100))
print(‘我生成了一个随机数字:’+q)
con.notify()
# 发起一个信号,释放一个被堵塞的线程
con.wait()
# 发起一个信号,堵塞当前线程,等待另一个notify出现的时候就执行下面的代码
con.release()
# 必须要解锁
def consume():
global q
while 1:
con.acquire()
# 必须在有锁的前提下才能使用条件变量
print(‘我获取到你生成的随机数字:’+q)
con.notify()
# 发起一个信号,释放一个被堵塞的线程
con.wait()
# 堵塞当前线程
con.release()
t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
con = threading.Condition()
t1.start()
t2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
返回结果:
我生成了一个随机数字:99
我获取到你生成的随机数字:99
我生成了一个随机数字:53
我获取到你生成的随机数字:53
我生成了一个随机数字:26
我获取到你生成的随机数字:26
1
2
3
4
5
6
同时condition的源码类中是有enter和exit两个魔法函数的,这也就意味能够使用with上下管理,省去自己加锁解锁的过程,举一个更加方便理解的例子:
import threading
import time
con = threading.Condition()
def run():
while 1:
with con:
# 使用上下文管理器,省去了自动上锁解锁的过程
print(‘—————–‘)
print(‘我完事了,该你了’)
con.notify()
# 发起一个信号,释放掉一个wait
con.wait()
def result():
while 1:
with con:
con.wait()
# 我在等待一个noity出现,这样我就能运行了
time.sleep(0.3)
print(‘三秒后….’)
print(‘我也完事了,你继续’)
con.notify()
t1 = threading.Thread(target=run)
t2 = threading.Thread(target=result)
t2.start()
t1.start()
注意这里必须t2先运行,想想为什么
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
返回结果:
我完事了,该你了
三秒后….
我也完事了,你继续
我完事了,该你了
三秒后….
我也完事了,你继续
……
1
2
3
4
5
6
7
8
9
condition源码中其实是调用了两把锁,外层的锁是调用lock,内层的锁是通过wait()方法实现,每次调用wait的时候,都会分配一把锁放在等待的队列中,知道noitfy方法出现就释放这把锁。
信号量实现定量的线程同步
semaphore适用于控制进入数量的锁,好比文件的读写操作,写入的时候一般只用一个线程写,如果多个线程同时执行写入操作的时候,就会造成写入数据混乱。 但是读取的时候可以用多个线程来读取,可以看到写与写是互斥的,读与写不是互斥的,读与读不是互斥的。
文件读写只是个例子,在一些日常业务中比如爬虫读取网址的线程数量控制等。
BoundedSemaphore。这种锁允许一定数量的线程同时更改数据,它不是互斥锁。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。
这个使用很简单的:
import time
import threading
def run(n, se):
se.acquire()
print(“run the thread: %s” % n)
time.sleep(1)
se.release()
设置允许5个线程同时运行
semaphore = threading.BoundedSemaphore(5)
for i in range(20):
t = threading.Thread(target=run, args=(i,semaphore))
t.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
运行后,可以看到5个一批的线程被放行。他用来控制进入某段代码的线程数量。
最后提起一点,Rolok是基于lock实现了,condition是基于Rlock和lock实现的,semahhore是基于condition实现的。
事件实现线程锁同步
事件线程锁的运行机制:
全局定义了一个Flag,如果Flag的值为False,那么当程序执行wait()方法时就会阻塞,如果Flag值为True,线程不再阻塞。这种锁,类似交通红绿灯(默认是红灯),它属于在红灯的时候一次性阻挡所有线程,在绿灯的时候,一次性放行所有排队中的线程。
事件主要提供了四个方法set()、wait()、clear()和is_set()。
调用wait()方法将等待信号。
is_set():判断当前是否状态
调用set()方法会将Flag设置为True。
调用clear()方法会将事件的Flag设置为False。
1
2
3
4
举个例子:
import threading
import time
import random
boys = [‘此时一位捡瓶子的靓仔路过\n————‘,’此时一位没钱的网友路过\n————‘,’此时一位推着屎球的屎壳郎路过\n————‘]
event = threading.Event()
def lighter():
event.set()
while 1:
ti = (random.randint(1, 10))
time.sleep(ti)
print(‘等待 {} 秒后’.format(str(ti)))
event.clear()
time.sleep(ti)
event.set()
def go(boy):
while 1:
if event.is_set():
# 如果事件被设置
print(‘在辽阔的街头’)
print(boy)
time.sleep(random.randint(1, 5))
else:
print(‘在寂静的田野’)
print(boy)
event.wait()
print(‘突然,一辆火车驶过’)
time.sleep(5)
t1 = threading.Thread(target=lighter)
t1.start()
for boy in boys:
t2 = threading.Thread(target=go,args=(boy,))
t2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
线程池编程
作用
线程池只有在py3.2才内置的,2版本只能自己维护一个线程池很麻烦。在Python3.2中的concurrent_futures,其可以实现线程池,进程池,不必再自己使用管道传数据造成死锁的问题。并且这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能,但是平时用的最多的还是用来构建线程池和进程池。
在线程池中,主线程可以获取任意一个线程的状态以及返回结果,并且当一个线程完成后,主线程能立即得到结果。
功能
此模块由以下部分组成:
concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
shutdown(Wait=True): 发出让执行者释放所有资源的信号。
concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例
done():比如t1.done()判断任务t1是否完成,没完成则返回False
cancel():比如t1.cancel(),取消线程执行,当线程正在执行和执行完毕后是没法取消执行的,但是如果一个线程没有启动的话,是可以取消t1线程执行的
套用模板
线程池用起来非常方便,基本上是照着模板往里面套即可
# coding:utf-8
from concurrent.futures import ThreadPoolExecutor
# 导入线程池模块
import threading
# 导入线程模块,作用是获取当前线程的名称
import os,time
def task(n):
print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
# 打印当前线程名和运行的id号码
time.sleep(2)
return n**2
# 返回传入参数的二次幂
if __name__ == '__main__':
p=ThreadPoolExecutor()
#实例化线程池,设置线程池的数量,不填则默认为cpu的个数*5
l=[]
# 用来保存返回的数据,做计算总计
for i in range(10):
obj=p.submit(task,i)
# 这里的obj其实是futures的对象,使用obj.result()获取他的结果
# 传入的参数是要执行的函数和该函数接受的参数
# submit是非堵塞的
# 这里执行的方式是异步执行
# -----------------------------------
# # p.submit(task,i).result()即同步执行
# -----------------------------------
# 上面的方法使用range循环有个高级的写法,即map内置函数
# p = ThreadPoolExecutor()
# obj=p.map(task,range(10))
# p.shutdown()
# 这里的obj的值就是直接返回的所有计算结果,不属于futures对象
# -----------------------------------
l.append(obj)
# 把返回的结果保存在空的列表中,做总计算
p.shutdown()
# 所有计划运行完毕,关闭结束线程池
print('='*30)
print([obj.result() for obj in l])
#上面方法也可写成下面的方法
# with ThreadPoolExecutor() as p: #类似打开文件,可省去.shutdown()
# future_tasks = [p.submit(task, i) for i in range(10)]
# print('=' * 30)
# print([obj.result() for obj in future_tasks])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
submit与map的区别
submit返回的是一个futures的对象,使用.result()才能获取他的运行结果
submit接受的对象是函数加一个固定的参数
map返回的是所有线程执行完毕后返回的结果
map接受的对象是函数加一个传入函数的集合列表
他们都能提前获取先执行完的结果
map比submit简单好用
map返回的结果是安装列表传入参数的顺序返回结果,submit返回结果是哪个线程先执行完就返回哪个线程的结果
获取部分执行完成的结果
想要获取到部分执行完成的结果使用
from concurrent.futures import as_completed
1
as_completed其实是一个生成器,他只会返回以及完成的线程结果
比如:
all_tasks = [p.submit(task,obj) for obj in l]
for f in as_completed(all_tasks):
data = f.result()
1
2
3
因为生成器的缘故,他会先把执行完毕的结果赋值给data,其实通过map方法也可以实现
for data in p.map(task,range(10)):
print data
1
2
这样也是可以优先把执行完毕的结果获取出来,map函数中其实也有yield函数。map和as_completed的区别在于,map执行是按照传入列表的元素一个一个按顺序返回的,并且返回的是直接的结果,as_completed返回的是一个futures的对象,使用.result()获取他的结果。as_completed返回的不是按照顺序返回的。
堵塞线程
多线程中使用jion堵塞线程,在线程池中也可以堵塞等待某一些子线程的线程池执行完毕后再执行主线程
from concurrent.futures import wait
1
wait用在你的线程池下面,比如:
all_tasks = [p.submit(task,obj) for obj in l]
wait(all_tasks)
1
2
这里只有等待all_tasks里面的线程执行完毕后才能继续执行,里面的可以加上参数等待线程池中只要有一个线程执行结束就执行后面的代码
wait(all_tasks,return_when=FIRST_COMPLETED)
1
判断是否执行完毕
task1 = p.submit(task,10)
print(task1.done())
如果执行完毕,返回True
1
2
3
取消执行线程
task1 = p.submit(task,10)
task1.cancel()
1
2
当线程正在执行和执行完毕后是没法取消执行的,但是如果一个线程没有启动的话,是可以取消t1线程执行的。
线程池提供的方法非常简单,易于使用,但是他背后的原理是很值得研究的。
线程池设计理念
线程池优秀的设计理念在于:他返回的结果并不是执行完毕后的结果,而是futures的对象,这个对象会在未来存储线程执行完毕的结果,这一点也是异步编程的核心。python为了提高与统一可维护性,多线程多进程和协程的异步编程都是采取同样的方式。
多进程
因为GIL的存在,没法利用到多核CPU的优势,这个时候多进程就出来了,它能利用多个CPU并发的优势实现并行。
多进程:消耗CPU操作,CPU密集计算
多线程:大量的IO操作
进程间切换的开销要大于线程间开销,对操作系统来说开多线程比开多进程来说消耗的资源少一些,不同的进程是由主进程完全复制后,每个进程独立隔离开,不像多线程对全局变量直接修改,因为是完全复制独立的,所以当主进程结束后,子进程任然执行。
另外多进程必须要注意的一点就是在运行的时候,必须要加上
if name == ‘main‘:
pass
1
2
进程池套用模板
与线程池用法几乎一致,进程池是基于multiprocessing实现的
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
print(‘%s is running’ %os.getpid())
time.sleep(2)
return n**2
if name == ‘main‘:
p=ProcessPoolExecutor() #不填则默认为cpu的个数
l=[]
start=time.time()
for i in range(10):
obj=p.submit(task,i) #submit()方法返回的是一个future实例,要得到结果需要用obj.result()
l.append(obj)
p.shutdown() #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用
print('='*30)
# print([obj for obj in l])
print([obj.result() for obj in l])
print(time.time()-start)
#上面方法也可写成下面的方法
# start = time.time()
# with ProcessPoolExecutor() as p: #类似打开文件,可省去.shutdown()
# future_tasks = [p.submit(task, i) for i in range(10)]
# print('=' * 30)
# print([obj.result() for obj in future_tasks])
# print(time.time() - start)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
进程池相关的使用功能与线程池基于一致。
multiprocessing具体使用与数据通信
这些功能在我之前的笔记写的比较完整与友好,这里不重复造轮子写概念了,
具体传送Python multiprocess 多进程模块
参考链接
具体链接之线程池进程池
参考链接 1
参考链接 2
参考链接 3
————————————————
版权声明:本文为CSDN博主「浪子燕青啦啦啦」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/lzy98/article/details/88819425