线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL,今天小编就来说说关于线程池作用?下面更多详细答案一起来看看吧!

线程池作用(线程池原理与实践)

线程池作用

JAVA 线程池原理与实践1.概述1.1.线程池是什么

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

创建线程本身开销大,反复创建并销毁,过多的占用内存。所以有大量线程创建考虑使用线程池。线程池不用反复创建线程达到线程的复用,更具配置合理利用cpu和内存减少了开销,性能会得到提高,还能统一管理任务。

比如服务器收到大量请求,每个请求都分配线程去处理,对服务器性能考验就比较大,如果创建5个以上线程考虑使用线程池。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

本文描述线程池是JDK中提供的threadPoolExecutor类。使用线程池可以带来一系列好处:

1.2.线程池解决的问题是什么

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

2.基本使用以及相关参数

ThreadPoolExecutor和ScheduledThreadPoolExecutor算是我们最常用的线程池类了,从上面我们可以看到这俩个最终都实现了Executor和ExecutorService这两个接口,实际上主要的接口定义都是在ExecutorService中。

2.1.ThreadPoolExecutor的构造方法

构造方法:

public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数说明:

workQueue通常情况下有如下选择:

LinkedBlockingQueue:无界队列,意味着无限制,其实是有限制,大小是int的最大值。也可以自定义大小。

ArrayBlockingQueue:有界队列,可以自定义大小,到了阈值就开启新线程(不会超过maximumPoolSize)。

SynchronousQueue:Executors.newCachedThreadPool();默认使用的队列。也不算是个队列,他不没有存储元素的能力。

一般都采取LinkedBlockingQueue,因为他也可以设置大小,可以取代ArrayBlockingQueue有界队列。

2.2.Executors线程池工具使用(不建议使用)

Executors创建返回ThreadPoolExecutor对象的方法共有三种:

2.2.1.Executors#newCachedThreadPool方法

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

CachedThreadPool是一个根据需要创建新线程的线程池

当一个任务提交时,corePoolSize为0不创建核心线程,SynchronousQueue是一个不存储元素的队列,可以理解为队里永远是满的,因此最终会创建非核心线程来执行任务。对于非核心线程空闲60s时将被回收。因为Integer.MAX_VALUE非常大,可以认为是可以无限创建线程的,在资源有限的情况下容易引起OOM异常

2.2.2.Executors#newSingleThreadExecutor方法

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

SingleThreadExecutor是单线程线程池,只有一个核心线程

当一个任务提交时,首先会创建一个核心线程来执行任务,如果超过核心线程的数量,将会放入队列中,因为LinkedBlockingQueue是长度为Integer.MAX_VALUE的队列,可以认为是无界队列,因此往队列中可以插入无限多的任务,在资源有限的时候容易引起OOM异常,同时因为无界队列,maximumPoolSize和keepAliveTime参数将无效,压根就不会创建非核心线程

2.2.3.Executors#newFixedThreadPool方法

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

FixedThreadPool是固定核心线程的线程池,固定核心线程数由用户传入

它和SingleThreadExecutor类似,唯一的区别就是核心线程数不同,并且由于使用的是LinkedBlockingQueue,在资源有限的时候容易引起OOM异常

2.3.线程池的执行流程

提交一个任务到线程池中,线程池的处理流程如下:

1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。

2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3、判断线程池里的线程是否都处于工作状态[线程数量是否达到最大值],如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

2.4.线程池参数:handler拒绝策略

拒绝策略handler这个参数,其类型为RejectedExecutionHandler,当线程池达到最大值并且线程数也达到最大值时才会工作,当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4种策略:

3.线程池结合SpringBoot实战

SpringBoot使用线程池我们常见的有两种方式:

3.1.方式一:通过@Async注解调用

第一步:在Application启动类上面加上@EnableAsync

@SpringBootApplication @EnableAsync public class ThreadpoolApplication { public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); } }

第二步:在需要异步执行的方法上加上@Async注解

@Service public class AsyncTest { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public void hello(String name){ //这里使用logger 方便查看执行的线程是什么 logger.info("异步线程启动 started." name); } }

第三步:测试类进行测试验证

@Autowired AsyncTest asyncTest; @Test void contextLoads() throws InterruptedException { asyncTest.hello("afsasfasf"); //一定要休眠 不然主线程关闭了,子线程还没有启动 Thread.sleep(1000); }

查看打印的日志:

INFO 2276 --- [ main] c.h.s.t.t.ThreadpoolApplicationTests : Started ThreadpoolApplicationTests in 3.003 seconds (JVM running for 5.342) INFO 2276 --- [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.afsasfasf

可以清楚的看到新开了一个task-1的线程执行任务。验证成功!!!

注意:@Async注解失效常景

3.2.方式二:使用自定义的线程池

在默认配置信息里面是没有线程池的拒绝策略设置的方法的,如果需要更换拒绝策略就需要自定义线程池,并且如果项目当中需要多个自定义的线程池,又要如何进行管理呢?

自定义Configuration 第一步:创建一个ThreadPoolConfig 先只配置一个线程池,并设置拒绝策略为CallerRunsPolicy

@Configuration public class ThreadPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }

然后执行之前写的测试代码发现,使用的线程池已经变成自定义的线程池了。

INFO 12740 --- [ myExecutor--2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 12740 --- [ myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

第二步:如果配置有多个线程池,该如何指定线程池呢?

@Configuration public class ThreadPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } @Bean("poolExecutor") public Executor poolExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } @Bean("taskPoolExecutor") public Executor taskPoolExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor3--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }

执行测试类,直接报错说找到多个类,不知道加载哪个类:

No qualifying bean of type 'org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor' available: expected single matching bean but found 3: taskExecutor,taskPoolExecutor

由于测试类当中是这样自动注入的:

@Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor;

考虑到@Autowired 以及@Resource两个注入时的存在多个类如何匹配问题,然后发现只要我们在注入时指定具体的bean就会调用对应的线程池!!!

即修改测试类如下:

@Autowired AsyncTest asyncTest; @Autowired ThreadPoolTaskExecutor poolExecutor; //会去匹配 @Bean("poolExecutor") 这个线程池 @Test void contextLoads() throws InterruptedException { asyncTest.hello("async注解创建"); //一定要休眠 不然主线程关闭了,子线程还没有启动 poolExecutor.submit(new Thread(()->{ logger.info("threadPoolTaskExecutor 创建线程"); })); Thread.sleep(1000); }

最后得到如下信息:

INFO 13636 --- [ myExecutor2--1] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 13636 --- [ myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

注意:如果是使用的@Async注解,只需要在注解里面指定bean的名称就可以切换到对应的线程池去了。如下所示:

@Async("taskPoolExecutor") public void hello(String name){ logger.info("异步线程启动 started." name); }

注意:如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池

3.3.submit和executor区别

execute和submit都属于线程池的方法,execute只能提交Runnable类型的任务,而submit既能提交Runnable类型任务也能提交Callable类型任务。

execute会直接抛出任务执行时的异常,submit会吃掉异常,可通过Future的get方法[会阻塞]将任务执行时的异常重新抛出。

execute所属顶层接口是Executor,submit所属顶层接口是ExecutorService,实现类ThreadPoolExecutor重写了execute方法,抽象类AbstractExecutorService重写了submit方法。

submit和execute由于参数不同有四种实现形式,如下所示,本文主要研究这四种形式在各自使用场景下的区别和联系

这种提交的方式会返回一个Future对象,这个Future对象代表这线程的执行结果 当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。 如果在线程的执行过程中发生了异常,get会获取到异常的信息。 <T> Future<T> submit(Callable<T> task); 当线程正常结束的时候调用Future的get方法会返回result对象,当线程抛出异常的时候会获取到对应的异常的信息。 <T> Future<T> submit(Runnable task, T result); 提交一个Runable接口的对象,这样当调用get方法的时候,如果线程执行成功会直接返回null,如果线程执行异常会返回异常的信息 Future<?> submit(Runnable task); void execute(Runnable command);

execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也即是提交后如果线程运行后,和主线程就脱离了关系了,当然可以设置一些变量来获取到线程的运行结果。并且当线程的执行过程中抛出了异常通常来说主线程也无法获取到异常的信息的,只有通过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常信息。

4.线程池原理

对于线程池的复用原理,可以简单的用一句话概括:创建指定数量的线程并开启,判断当前是否有任务执行,如果有则执行任务。再通俗易懂一些:创建指定数量的线程并运行,重写run方法,循环从任务队列中取Runnable对象,执行Runnable对象的run方法。

接下来开始手写线程池吧,注意是简易线程池,跟JDK自带的线程池无法相提并论,在这里我省略了判断当前线程数有没有大于核心线程数的步骤,简化成直接从队列中取任务,对于理解原理来说已然足矣,代码如下:

public class MyExecutorService { /** * 一直保持运行的线程 */ private List<WorkThread> workThreads; /* * 任务队列容器 */ private BlockingDeque<Runnable> taskRunables; /* * 线程池当前是否停止 */ private volatile boolean isWorking = true; public MyExecutorService(int workThreads, int taskRunables) { this.workThreads = new ArrayList<>(); this.taskRunables = new LinkedBlockingDeque<>(taskRunables); //直接运行核心线程 for (int i = 0; i < workThreads; i ) { WorkThread workThread = new WorkThread(); workThread.start(); this.workThreads.add(workThread); } } /** * WorkThread累,线程池的任务类,类比JDK的worker */ class WorkThread extends Thread { @Override public void run() { while (isWorking || taskRunables.size() != 0) { //获取任务 Runnable task = taskRunables.poll(); if (task != null) { task.run(); } } } } //执行execute,jdk中会存在各种判断,这里省略了 public void execute(Runnable runnable) { //把任务加入队列 taskRunables.offer(runnable); } //停止线程池 public void shutdown() { this.isWorking = false; } }

测试

//测试自定义的线程池 public static void main(String[] args) { MyExecutorService myExecutorService = new MyExecutorService(3, 6); //运行8次 for (int i = 0; i < 8; i ) { myExecutorService.execute(() -> { System.out.println(Thread.currentThread().getName() "task begin"); }); } myExecutorService.shutdown(); }

通过以上分析并手写线程池,我们应该已经基本理解了线程池的复用机制原理,实际上JDK的实现机制远比我们手写的要复杂的多,主要有以下两点,可以让我们进一步加深理解:

当有新任务来的时候,首先判断当前的线程数有没有超过核心线程数,如果没超过则直接新建一个线程来执行新的任务,如果超过了则判断缓存队列有没有满,没满则将新任务放进缓存队列中,如果队列已满并且线程池中的线程数已经达到了指定的最大线程数,那就根据相应的策略拒绝任务,默认为抛异常。

当缓存队列中的任务都执行完毕后,线程池中的线程数如果大于核心线程数并且已经超过了指定的存活时间(存活时间通过队列的poll方法传入,如果指定时间内没有获取到任务,则break退出,线程运行结束),就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时剩余的线程会一直处于阻塞状态,等待新的任务到来。

有兴趣可以看这块的源码,大致的思想就是:

首先,线程池会有一个管理任务的队列,这个任务队列里存放的就是各种任务,线程池会一直不停循环的去查看消息队里有没有接到任务,如果没有,则继续循环,如果有了则开始创建线程,如果给这个线程池设定的容量是10个线程,那么当有任务的时候就会调用创建线程的函数方法去根据当前任务总数量依次创建线程(这里创建线程的函数方法都是你提前些好了的),线程中会写好循环获取任务队列里任务的逻辑、判断是否销毁该线程的逻辑、进入等待的逻辑,这样线程一旦创建出来就会循环的去查询任务队列里的任务,拿到任务后就执行,执行任务完毕后判断是否销毁该线程,如果不销毁就进入等待(sleep),等待时间过后继续查询消息是否有任务,如此循环,直到逻辑判断需要销毁该线程为止(一般都是根据设定时间去判断是否销毁,例如在线程创建的时候设置一个计时器去控制,如果180秒都没有接到新的任务,则销毁该线程) 。

,