线程池作用线程池5种状态

在什么情况下用到线程池(别告诉我你连线程池都不会用)(1)

Excutes为什么不推荐Executors

Executors工具类创建的线程池队列或线程默认为Integer.MAX_VALUE,容易堆积请求 阿里巴巴Java开发手册:

推荐使用ThreadPoolExecutor类根据实际需要自定义创建

ThreadPoolExecutor七大参数

ThreadPoolExecutor类主要有以下七个参数:

四大策略

拒绝策略就是当队列满时,线程如何去处理新来的任务。

AbortPolicy(中止策略)-默认CallerRunsPolicy(调用者运行策略)DiscardPolicy(丢弃策略)DiscardOldestPolicy(弃老策略)工作队列运行流程

在什么情况下用到线程池(别告诉我你连线程池都不会用)(2)

一个面试题

一个线程池 core 7; max 20 , queue: 50, 100并发进来怎么分配的?

答:先有7个能直接得到执行, 接下来把50个进入队列排队等候, 在多开13个继续执行。 现在 70 个被安排上了。 剩下 30 个默认执行饱和策略。

执行任务关闭线程池线程池出现异常会发生什么?最佳实践
  1. 提交线程的业务异常用try catch处理,保证线程不会异常退出
  2. 业务之外的异常我们不可预见的,创建线程池设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过submit提交任务,可以吃掉异常并复用线程;想要捕获异常这时用future.get()

注:关于异常处理的相关案例,已在源码中,这里不做展示

实战1:结合CompletableFuture使用线程池步骤1:声明一个线程池bean

application.properties

//尽量做到每个业务使用自己配置的线程池 service1.thread.coreSize=10 service1.thread.maxSize=100 service1.thread.keepAliveTime=10

线程池属性类

/** * @Description: 线程池属性 * @Author: jianweil * @date: 2021/12/9 10:44 */ @ConfigurationProperties(prefix = "service1.thread") @Data public class ThreadPoolConfigProperties { private Integer coreSize; private Integer maxSize; private Integer keepAliveTime; }

线程池配置类

/** * @Description: 线程池配置类:根据不同业务定义不同的线程池配置 **/ @EnableConfigurationProperties(ThreadPoolConfigProperties.class) @Configuration public class MyService1ThreadConfig { @Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) { return new ThreadPoolExecutor( pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); } }

步骤2:使用

注:本文所有源码已分享github

/** * @Description: 测试CompletableFuture * @Author: jianweil * @date: 2021/12/9 10:50 */ @SpringBootTest public class CompletableFutureTest { @Autowired private ThreadPoolExecutor threadPoolExecutor; /*** * 无返回值 * runAsync */ @Test public void main1() { System.out.println("main.................start....."); CompletableFuture.runAsync(() -> { System.out.println("当前线程:" Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" i); }, threadPoolExecutor); System.out.println("main.................end......"); } }

实战2:结合@Async使用线程池方式1:默认线程池步骤1:自定义一个能查看线程池参数的类

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }

步骤2:实现AsyncConfigurer类

/** * @Description: 注解@async配置 * @Author: jianweil * @date: 2021/12/9 11:52 */ @Slf4j @EnableAsync @Configuration public class AsyncThreadConfig implements AsyncConfigurer { /** * 定义@Async默认的线程池 * ThreadPoolTaskExecutor不是完全被IOC容器管理的bean,可以在方法上加上@Bean注解交给容器管理,这样可以将taskExecutor.initialize()方法调用去掉,容器会自动调用 * * @return */ @Override public Executor getAsyncExecutor() { int processors = Runtime.getRuntime().availableProcessors(); //常用的执行器 //ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //可以查看线程池参数的自定义执行器 ThreadPoolTaskExecutor taskExecutor = new VisiableThreadPoolTaskExecutor(); //核心线程数 taskExecutor.setCorePoolSize(1); taskExecutor.setMaxPoolSize(2); //线程队列最大线程数,默认:50 taskExecutor.setQueueCapacity(50); //线程名称前缀 taskExecutor.setThreadNamePrefix("default-ljw-"); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化(重要) taskExecutor.initialize(); return taskExecutor; } /** * 异步方法执行的过程中抛出的异常捕获 * * @return */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("线程池执行任务发送未知错误,执行方法:{}", method.getName(), ex.getMessage()); } }

步骤3: 使用

/** * 默认线程池 */ @Async public void defaultThread() throws Exception { long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(1000)); long end = System.currentTimeMillis(); int i = 1 / 0; log.info("使用默认线程池,耗时:" (end - start) "毫秒"); }

方式2:指定线程池步骤1:声明一个线程池bean

/** * @Description: 注解@async配置 * @Author: jianweil * @date: 2021/12/9 11:52 */ @Slf4j @EnableAsync @Configuration public class AsyncThreadConfig implements AsyncConfigurer { @Bean("service2Executor") public Executor service2Executor() { //Java虚拟机可用的处理器数 int processors = Runtime.getRuntime().availableProcessors(); //定义线程池 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //可以查看线程池参数的自定义执行器 //ThreadPoolTaskExecutor taskExecutor = new VisiableThreadPoolTaskExecutor(); //核心线程数 taskExecutor.setCorePoolSize(processors); taskExecutor.setMaxPoolSize(100); //线程队列最大线程数,默认:100 taskExecutor.setQueueCapacity(100); //线程名称前缀 taskExecutor.setThreadNamePrefix("my-ljw-"); //线程池中线程最大空闲时间,默认:60,单位:秒 taskExecutor.setKeepAliveSeconds(60); //核心线程是否允许超时,默认:false taskExecutor.setAllowCoreThreadTimeOut(false); //IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds) taskExecutor.setWaitForTasksToCompleteOnShutdown(false); //阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown) taskExecutor.setAwaitTerminationSeconds(10); /** * 拒绝策略,默认是AbortPolicy * AbortPolicy:丢弃任务并抛出RejectedExecutionException异常 * DiscardPolicy:丢弃任务但不抛出异常 * DiscardOldestPolicy:丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务 * CallerRunsPolicy:执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return taskExecutor; } }

步骤2: 使用

/** * 指定线程池service2Executor * * @throws Exception */ @Async("service2Executor") public void service2Executor() throws Exception { long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(1000)); long end = System.currentTimeMillis(); log.info("使用线程池service2Executor,耗时:" (end - start) "毫秒"); }

注:异步任务返回值为void,不能获取的返回值的

计算线程数量适合框架类

例如netty,dubbo这种底层通讯框架通常会参考进行设置

实际情况往往复杂的多,并不会按照这个进行设置

IO密集型类型进阶算法

在我们的业务开发中,基本上都是IO密集型,因为往往都会去操作数据库,访问redis,es等存储型组件,涉及到磁盘IO,网络IO。

一个4C8G的机器上部署了一个MQ消费者,在RocketMQ的实现中,消费端也是用一个线程池来消费线程的,那这个线程数要怎么设置呢?

那我们怎么判断需要增加更多线程呢?

线程数规划的公式(推荐)

《Java 并发编程实战》介绍了一个线程数计算的公式:

在什么情况下用到线程池(别告诉我你连线程池都不会用)(3)

如果希望程序跑到CPU的目标利用率,需要的线程数公式为:

在什么情况下用到线程池(别告诉我你连线程池都不会用)(4)

如果我期望目标利用率为90%(多核90),那么需要的线程数为:

在什么情况下用到线程池(别告诉我你连线程池都不会用)(5)

把公式变个形,还可以通过线程数来计算CPU利用率:

在什么情况下用到线程池(别告诉我你连线程池都不会用)(6)

虽然公式很好,但在真实的程序中,一般很难获得准确的等待时间和计算时间,因为程序很复杂,不只是“计算” 。一段代码中会有很多的内存读写,计算,I/O 等复合操作,精确的获取这两个指标很难,所以光靠公式计算线程数过于理想化。

真实程序中的线程数是没有固定答案,先设定预期,比如我期望的CPU利用率在多少,负载在多少,GC频率多少之类的指标后,再通过测试不断的调整到一个合理的线程数

获取CPU核心数

Runtime.getRuntime().availableProcessors()//获取逻辑核心数,如6核心12线程,那么返回的是12

# 总核数 = 物理CPU个数 X 每颗物理CPU的核数 # 总逻辑CPU数 = 物理CPU个数 X 每颗物理CPU的核数 X 超线程数 # 查看物理CPU个数 cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l # 查看每个物理CPU中core的个数(即核数) cat /proc/cpuinfo| grep "cpu cores"| uniq # 查看逻辑CPU的个数 cat /proc/cpuinfo| grep "processor"| wc -l

作者:小伙子vae链接:https://juejin.cn/post/7042107393668284452来源:稀土掘金

,