作者:指尖上的榴莲

www.jianshu.com/p/704a6c5d337c

一.概述

线程池,顾名思义就是存放线程的池子,池子里存放了很多可以复用的线程。

如果不用类似线程池的容器,每当我们需要执行用户任务的时候都去创建新的线程,任务执行完之后线程就被回收了,这样频繁地创建和销毁线程会浪费大量的系统资源。

因此,线程池通过线程复用机制,并对线程进行统一管理,具有以下优点:

ThreadPoolExecutor是线程池框架的一个核心类,本文通过对threadPoolExecutor源码的分析(基于JDK 1.8),来深入分析线程池的实现原理。

二.ThreadPoolExecutor类的属性

先从ThreadPoolExecutor类中的字段开始:

线程池原理与算法(深入分析线程池的实现原理)(1)

在ThreadPoolExecutor类的这些属性中,线程池状态是控制线程池生命周期至关重要的属性,这里就以线程池状态为出发点进行研究。

通过上面的源码可知,线程池的运行状态总共有5种,其值和含义分别如下:

然而,线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数:

线程池原理与算法(深入分析线程池的实现原理)(2)

接下来,我们看一下线程池状态的所有转换情况,如下:

通常情况下,线程池有如下两种状态转换流程:

三.ThreadPoolExecutor类的构造方法

通常情况下,我们使用线程池的方式就是new一个ThreadPoolExecutor对象来生成一个线程池。接下来,先看ThreadPoolExecutor类的构造函数:

线程池原理与算法(深入分析线程池的实现原理)(3)

接下来,看下最后一个构造函数的具体实现:

线程池原理与算法(深入分析线程池的实现原理)(4)

下面解释下一下构造器中各个参数的含义:

1.corePoolSize

线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。

2.maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。

3.keepAliveTime

线程空闲时的存活时间。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

4.unit

keepAliveTime参数的时间单位。

5.workQueue

任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。

一般来说,这里的BlockingQueue有以下三种选择:

6.threadFactory

线程工厂,创建新线程时使用的线程工厂。

7.handler

任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

四.线程池的实现原理

1.提交任务

线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。

submit()方法的实现有以下三种:

public Future<?> submit(Runnable task); public <T> Future<T> submit(Runnable task, T result); public <T> Future<T> submit(Callable<T> task);

下面以第一个方法为例简单看一下submit()方法的实现:

public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }

submit()方法是在ThreadPoolExecutor的父类AbstractExecutorService类实现的,最终还是调用的ThreadPoolExecutor类的execute()方法,下面着重看一下execute()方法的实现。

线程池原理与算法(深入分析线程池的实现原理)(5)

execute()方法的执行流程可以总结如下:

可以结合下面的两张图来理解线程池提交任务的执行流程。

线程池原理与算法(深入分析线程池的实现原理)(6)

2.创建线程

从execute()方法的实现可以看出,addWorker()方法主要负责创建新的线程并执行任务,代码实现如下:

线程池原理与算法(深入分析线程池的实现原理)(7)

线程池原理与算法(深入分析线程池的实现原理)(8)

因为代码(1)处的逻辑不利于理解,我们通过(1)的等价实现来理解:

if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //等价实现 rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())

其含义为,满足下列条件之一则直接返回false,线程创建失败:

多说一句,若线程池处于 SHUTDOWN, firstTask 为 null,且 workQueue 非空,那么还得创建线程继续处理任务缓存队列中的任务。

总结一下,addWorker()方法完成了如下几件任务:

  1. 原子性的增加workerCount
  2. 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中
  3. 启动worker对应的线程
  4. 若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount

3.工作线程的实现

从addWorker()方法的实现可以看出,工作线程的创建和启动都跟ThreadPoolExecutor中的内部类Worker有关。下面我们分析Worker类来看一下工作线程的实现。

Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

Worker的主要字段就下面三个,代码也比较简单。

//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来 final Thread thread; //worker所对应的第一个任务,可能为空 Runnable firstTask; //记录当前线程完成的任务数 volatile long completedTasks;

Worker的构造函数如下。

Worker(Runnable firstTask) { //设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断 setState(-1); //初始化第一个任务 this.firstTask = firstTask; //利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this //也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法 this.thread = getThreadFactory().newThread(this); }

Worker类继承了AQS类,重写了其相应的方法,实现了一个自定义的同步器,实现了不可重入锁。

线程池原理与算法(深入分析线程池的实现原理)(9)

Worker类还提供了一个中断线程thread的方法。

void interruptIfStarted() { Thread t; //AQS状态大于等于0,worker对应的线程不为null,且该线程没有被中断 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }

再来看一下Worker类的run()方法的实现,会发现run()方法最终调用了ThreadPoolExecutor类的runWorker()方法。

public void run() { runWorker(this); }

4.线程复用机制

通过上文可以知道,worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。

线程池原理与算法(深入分析线程池的实现原理)(10)

runWorker()方法是线程池的核心,实现了线程池中的线程复用机制,来看一下

runWorker()方法都做了哪些工作:

  1. 运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行;
  2. 获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁;
  3. 在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook方法;
  4. 执行通过getTask()方法获取到的任务
  5. 线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作

从runWorker()方法的实现可以看出,runWorker()方法中主要调用了getTask()方法和processWorkerExit()方法,下面分别看一下这两个方法的实现。

getTask()的实现

getTask()方法用来不断地从任务缓存队列获取任务并交给线程执行,下面分析一下其实现。

线程池原理与算法(深入分析线程池的实现原理)(11)

接下来总结一下getTask()方法会在哪些情况下返回:

  1. 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象
  2. 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象
  3. 线程池状态大于等于STOP,返回null,回收线程
  4. 线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程
  5. worker数量大于maximumPoolSize,返回null,回收线程
  6. 线程空闲时间超时,返回null,回收线程

processWorkerExit()的实现

processWorkerExit()方法负责执行结束线程的一些清理工作,下面分析一下其实现。

线程池原理与算法(深入分析线程池的实现原理)(12)

processWorkerExit()方法中主要调用了tryTerminate()方法,下面看一下tryTerminate()方法的实现。

线程池原理与算法(深入分析线程池的实现原理)(13)

tryTerminate()方法的作用是尝试终止线程池,它会在所有可能终止线程池的地方被调用,满足终止线程池的条件有两个:首先,线程池状态为STOP,或者为SHUTDOWN且任务缓存队列为空;其次,工作线程数量为0。

满足了上述两个条件之后,tryTerminate()方法获取全局锁,设置线程池运行状态为TIDYING,之后执行terminated()钩子方法,最后设置线程池状态为TERMINATED。

至此,线程池运行状态变为TERMINATED,工作线程数量为0,workers已清空,且workQueue也已清空,所有线程都执行结束,线程池的生命周期到此结束。

5.关闭线程池

关闭线程池有两个方法,shutdown()和shutdownNow(),下面分别看一下这两个方法的实现。

shutdown()的实现

shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。

线程池原理与算法(深入分析线程池的实现原理)(14)

shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。

shutdown()方法调用了interruptIdleWorkers()方法中断所有空闲的worker,其实现如下。

线程池原理与算法(深入分析线程池的实现原理)(15)

shutdownNow()的实现

shutdownNow()方法将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务。

线程池原理与算法(深入分析线程池的实现原理)(16)

shutdownNow()方法与shutdown()方法相似,不同之处在于,前者设置线程池的运行状态为STOP,之后中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表。

shutdownNow()方法调用了interruptWorkers()方法中断所有的worker(并非只是空闲的worker),其实现如下。

线程池原理与算法(深入分析线程池的实现原理)(17)

五.总结

至此,我们已经阅读了线程池框架的核心类ThreadPoolExecutor类的大部分源码,由衷地赞叹这个类很多地方设计的巧妙之处:

其实,线程池的本质就是生产者消费者模式,线程池的调用者不断向线程池提交任务,线程池里面的工作线程不断获取这些任务并执行(从任务缓存队列获取任务或者直接执行任务)。

读完本文,相信大家对线程池的实现原理有了深刻的认识,比如向线程池提交一个任务之后线程池的执行流程,一个任务从被提交到被执行会经历哪些过程,一个工作线程从被创建到正常执行到执行结束的执行过程,等等。

对了,在这里说一下,我目前是在职Java开发,如果你现在正在学习Java,了解Java,渴望成为一名合格的Java开发工程师,在入门学习Java的过程当中缺乏基础入门的视频教程,可以关注并私信我:01。获取。我这里有最新的Java基础全套视频教程。

,