python线程池的实现原理
详解python中的线程与线程池线程
进程和线程
什么是进程?
进程就是正在运行的程序, 一个任务就是一个进程, 进程的主要工作是管理资源, 而不是实现功能
什么是线程?
线程的主要工作是去实现功能, 比如执行计算.
线程和进程的关系就像员工与老板的关系,
老板(进程) 提供资源 和 工作空间,
员工(线程) 负责去完成相应的任务
特点
一个进程至少由一个线程, 这一个必须存在的线程被称为主线程, 同时一个进程也可以有多个线程, 即多线程
当我们我们遇到一些需要重复执行的代码时, 就可以使用多线程分担一些任务, 进而加快运行速度
线程的实现
线程模块
python通过两个标准库_thread和threading, 提供对线程的支持 , threading对_thread进行了封装。
threading模块中提供了thread , lock , rlock , condition等组件。
因此在实际的使用中我们一般都是使用threading来实现多线程
线程包括子线程和主线程:
主线程 : 当一个程序启动时 , 就有一个线程开始运行 , 该线程通常叫做程序的主线程
子线程 : 因为程序是开始时就执行的 , 如果你需要再创建线程 , 那么创建的线程就是这个主线程的子线程
主线程的重要性体现在两方面 :
- 是产生其他子线程的线程
- 通常它必须最后完成执行, 比如执行各种关闭操作
thread类
常用参数说明
参数 | 说明 |
---|---|
target | 表示调用的对象, 即子线程要执行的任务, 可以是某个内置方法, 或是你自己写的函数 |
name | 子线程的名称 |
args | 传入target函数中的位置参数, 是一个元组, 参数后必须加逗号 |
常用实例方法
方法 | 作用 |
---|---|
thread.run(self) | 线程启动时运行的方法, 由该方法调用 target参数所指定的函数 |
thread.start(self) | 启动进程, start方法就是区帮你调用run方法 |
thread.terminate(self) | 强制终止线程 |
thread.join(self, timeout=none) | 阻塞调用, 主线程进行等待 |
thread.setdaemon(self, daemonic) | 将子线程设置为守护线程, 随主线程结束而结束 |
thread.getname(self, name) | 获取线程名 |
thread.setname(self, name) | 设置线程名 |
创建线程
在python中创建线程有两种方式, 实例thread类和继承重写thread类
实例thread类
|
import threading import time def run(name, s): # 线程要执行的任务 time.sleep(s) # 停两秒 print ( 'i am %s' % name) # 实例化线程类, 并传入函数及其参数, t1 = threading.thread(target = run, name = 'one' , args = ( 'one' , 5 )) t2 = threading.thread(target = run, name = 'two' , args = ( 'two' , 2 )) # 开始执行, 这两个线程会同步执行 t1.start() t2.start() print (t1.getname()) # 获取线程名 print (t2.getname()) # result: one two i am two # 运行2s后 i am one # 运行5s后 |
继承thread类
|
class mythread(threading.thread): # 继承threading中的thread类 # 线程所需的参数 def __init__( self , name, second): super ().__init__() self .name = name self .second = second # 重写run方法,表示线程所执行的任务,必须有 def run( self ): time.sleep( self .second) print ( 'i am %s' % self .name) # 创建线程实例 t1 = mythread( 'one' , 5 ) t2 = mythread( 'two' , 2 ) # 启动线程,实际上是调用了类中的run方法 t1.start() t2.start() t1.join() print (t1.getname()) print (t2.getname()) # result: i am two # 运行后2s i am one # 运行后5s one two |
常用方法
join()
阻塞调用程序 , 直到调用join () 方法的线程执行结束, 才会继续往下执行
|
# 开始执行, 这两个线程会同步执行 t1.start() t2.start() t1.join() # 等待t1线程执行完毕,再继续执行剩余的代码 print (t1.getname()) print (t2.getname()) # result: i am two i am one one two |
setdemon()
使用给线程设置守护模式: 子线程跟随主线程的结束而结束, 不管这个子线程任务是否完成. 而非守护模式的子线程只有在执行完成后, 主线程才会执行完成
setdaemon() 与 join() 基本上是相对的 , join会等子线程执行完毕 ; 而setdaemon则不会等
|
def run(name, s): # 线程要执行的函数 time.sleep(s) # 停两秒 print ( 'i am %s' % name) # 实例化线程类, 并传入函数及其参数 t1 = threading.thread(target = run, name = 'one' , args = ( 'one' , 5 )) t2 = threading.thread(target = run, name = 'two' , args = ( 'two' , 2 )) # 给t1设置守护模式, 使其随着主线程的结束而结束 t1.setdaemon(true) # 开始执行, 这两个线程会同步执行 t1.start() t2.start() # 主线程会等待未设置守护模式的线程t2执行完成 # result: i am two # 运行后2s |
线程间的通信
互斥锁
在同一个进程的多线程中 , 其中的变量对于所有线程来说都是共享的 , 因此 , 如果多个线程之间同时修改一个变量 , 那就乱套了 , 共享的数据就会有很大的风险 , 所以我们需要互斥锁 , 来锁住数据 , 防止篡改。
来看一个错误的示范:
|
a = 0 def incr(n): global a for i in range (n): a + = 1 # 这两个方法同时声明了变量a,并对其进行修改 def decr(n): global a for i in range (n): a - = 1 t_incr = threading.thread(target = incr, args = ( 1000000 ,)) t_decr = threading.thread(target = decr, args = ( 1000000 ,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print (a) # 期望结果应该是0, 但是因为这里没有设置互斥锁, 所以两个方法是同时对同一个变量进行修改, 得到的的结果值是随机的 |
下面我们改一下上面的代码 , 两个方法加上互斥锁:
|
a = 0 lock = threading.lock() # 实例化互斥锁对象, 方便之后的调用 def incr(n): global a for i in range (n): lock.acquire() # 上锁的方法 a + = 1 lock.release() # 解锁的方法 # 要注意的是上锁的位置是, 出现修改操作的代码 def decr(n): global a for i in range (n): with lock: # 也可以直接使用with, 自动解锁 a - = 1 t_incr = threading.thread(target = incr, args = ( 1000000 ,)) t_decr = threading.thread(target = decr, args = ( 1000000 ,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print (a) # result: 0 |
在容易出现抢夺资源的地方进行上锁 , 实现同一时间内 , 只有一个线程可以对对象进行操作
队列queue
常用方法
关键字 | 解释 |
---|---|
put(item) | 入队 , 将item放入队列中 , 在队列为满时插入值会发生阻塞(1) |
get() | 出队 , 从队列中移除并返回一个数据 , 在队列为空时获取值会发生阻塞 |
task_done() | 任务结束 , 意味着之前入队的一个任务已经完成。由队列的消费者线程调用 |
join() | 等待完成 , 阻塞调用线程,直到队列中的所有任务被处理掉。 |
empty() | 如果队列为空,返回true,反之返回false |
full() | 如果队列为满,返回true,反之返回false |
qsize() | 队列长度 , 返回当前队列的数据量 |
(1): 阻塞: 程序停在阻塞的位置 , 无法继续执行
导入和实例化
|
import queue q = queue.queue( 4 ) # 实例化队列对象, 并设置最大数据量 |
put() 和 get()
|
q.put( 'a' ) q.put( 'b' ) print (q.get()) # : a print (q.get()) # : b q.task_done() # get后必须要加task_done,确认get操作是否完成 |
|
q.put( 1 ) # 当前队列已满,再次put就会阻塞 print (q.full()) # 由于已经阻塞, 所以这段不会被执行 # put会在队列慢了点时候,在插入值会发生阻塞 # get会在队列里没有值的时候,会发生阻塞 |
empty()
|
print (q.empty()) # 判断队列是否为空: true q.put( 'test' ) print (q.empty()) # : false |
qsize()
|
print (q.qsize()) # 当前队列里有多少人: 1 |
full()
|
q.put( 1 ) q.put( 1 ) q.put( 1 ) print (q.full()) # : true |
join()
|
print ( 'testetsetset' ) q.join() # join会在队列非空时发生阻塞 print ( 'done' ) # 由于已经阻塞, 所以这段不会被执行 |
线程池
池的概念
线程池中实现准备好了一些可以重复使用的线程 , 等待接受任务并执行
主线程提交任务给 线程池 , 线程池中的每个线程会一次一个的接收任务并执行 , 直到主线程执行结束
主线程: 相当于生产者,只管向线程池提交任务。
并不关心线程池是如何执行任务的。
因此,并不关心是哪一个线程执行的这个任务。
线程池: 相当于消费者,负责接收任务,
并将任务分配到一个空闲的线程中去执行。
自定义线程池
|
import queue import threading import time class threadpool: # 自定义线程池 def __init__( self , n): # 主线程做 self .queue_obj = queue.queue() for i in range (n): threading.thread(target = self .worker, daemon = true).start() # 给子线程worker设置为守护模式 def worker( self ): # 子线程做,由于debug调试的只是主线程的代码,所以在调试时看不到子线程执行的代码 """线程对象,写while true 是为了能够一直执行任务。""" while true: # 让线程执行完一个任务之后不会死掉,主线程结束时,守护模式会让worker里的死循环停止 func = self .queue_obj.get() # get已经入队的任务, 这里会接收到主线程分配的func # 由于设置了守护模式,当队列为空时,不会一直阻塞在get这里 # 有了守护模式,worker会在主线程执行完毕后死掉 func() # 将队列里的任务拿出来调用 """ 这里func与task_done的顺序非常重要,如果func放在task_done后面的话会出现只执行两次就结束。 """ self .queue_obj.task_done() # task_done 会刷新计数器 # 线程池里有一个类似计数器的机制,用来记录put的次数(+1),每一次task_done都会回拨一次记录的次数(-1) # 当回拨完计数器为0之后,就会执行join def apply_async( self , func): # 主线程做 """向队列中传入需要执行的函数对象""" self .queue_obj.put(func) # 将接收到的func入队 def join( self ): # 主线程做 """等待队列中的内容被取完""" self .queue_obj.join() # 队列里不为空就阻塞,为空就不阻塞 |
简单使用
|
def task1(): # 子线程做 time.sleep( 2 ) print ( 'task1 over' ) def task2(): # 子线程做 time.sleep( 3 ) print ( 'task2 over' ) p = threadpool( 2 ) # 如果在start开启线程之后没有传入任务对象,worker里的get会直接阻塞 p.apply_async(task1) p.apply_async(task2) print ( 'start' ) p.join() print ( 'done' ) # result: start task1 over task2 over done |
如果get发生阻塞意味着队列为空,意味着join不阻塞,意味着print('done')会执行,
意味着主线程没有任务在做,意味着主线程结束,意味着不等待设置了守护的线程执行任务,
意味着子线程会随着主线程的死亡而死亡,这就是为什么会设置守护模式。
如果没有设置守护模式意味着get发生阻塞,意味着子线程任务执行不完,意味着主线程一直要等子线程完成,
意味着程序一直都结束不了,意味着程序有问题
python内置线程池
原理
- 创建线程池
- 将任务扔进去
- 关闭线程池
- 等待线程任务执行完毕
'''手动实现线程池:
主要是配合队列来进行实现,我们定义好一个队列对象,然后将我们的任务对象put到我们的队列对象中,
然后使用多线程,让我们的线程去get队列种的对象,然后各自去执行自己get到的任务,
这样的话其实也就实现了线程池
'''
使用方法