Executor框架的概述
JDK1.5之前,我们如果想要使用Java线程来完成相关任务,一般涉及两个类,一个是Thread类,一个Thread对象在启动(start)之后会创建一个关联的本地操作系统线程,随后会自动回调run方法。另一个是Runnable接口,可以看作 run方法的抽象,代表线程任务。通过Runnable和Thread的配合可以编写我们自己的多线程逻辑。
可以看到,此时Java对于多线程编程的支持还是比较原始的,功能也并不多。因此,在JDK1.5的JUC包中,对Java的多线程应用做了一次全面的扩展,比如新lock锁、并发容器等,还有一个重要的扩展就是出现了Executor执行框架。
Executor执行框架将Java线程的应用做了更细致的功能划分,并且进行了功能的增强,大概包括三个部分:
- 线程任务
JDK1.5之前,只有Runnable代表线程任务,对于受检异常,必须手动在try catch中处理,不支持throws声明可能抛出的异常,不支持任务返回值。
JDK1.5的时候,出现了Callable接口,可以看作Runnable的增强:对于受检异常,可以不用在try catch中处理,支持throws声明可能抛出的异常,并且支持任务返回值。 - 执行器
JDK1.5之前,线程任务的执行需要我们手动创建Thread对象、传入任务、并调用start方法,一个任务对应一个线程,它们之间的关联非常紧密,这样对于线程任务的管理、线程资源的复用等功能几乎没有,或者只能自己手动实现,非常麻烦。
JDK1.5的时候,出现了Executor线程池。线程池作为任务执行器,我们只需要创建指定的线程池,随后将线程任务传入线程池中,由线程池来确定是将任务直接分配给池中的线程去执行、还是创建线程并执行、或者是加入任务队列等待等等逻辑,使用线程池之后我们不再需要手动创建线程去执行,并且可以实现线程的复用以及线程任务的管理等强大的功能。执行器(线程池)将任务与线程解耦! - 异步执行结果
JDK1.5之前,在线程任务启动之后,对于线程任务监控几乎没有,我们不知道任务有没有完成,也没办法定义任务的返回值等一系列信息。
JDK1.5的时候,出现了Future接口以及它的各种实现。这个接口体系代表了线程任务异步计算的结果,通常与Callable线程任务连用。利用了Future设计模式,在一个线程A执行线程任务的时候,我么可以在另一个线程B中异步的通过Future的实现的相关方法完成判断对应线程任务是否执行完毕、是否被取消、手动取消正在执行的线程任务、以及从执行完毕的线程任务中获取返回值等功能。
有了执行框架,我们只需创建线程任务、然后交给指定的线程池去执行,执行完毕之后等待获取返回结果即可,不再需要关注线程的创建、开启、执行、回收等基础性的中间工作,将任务与线程解耦,程序员更加的关注线程任务本身(这里是和业务相关的),有利于多线程程序的开发!
如果想要使用执行框架,只需要看相关api文档即可!那么我们有必要深入理解执行框架吗?当然时间充足的情况下是有必要的,只有我们知道了执行框架的原理之后,才能更好的使用它。执行框架属于JDK自带的基础框架,经历了时间和众多Java开发者的考验,不求能够手写,仅仅学习它的设计精华,包括各种设计模式,同时避开隐藏的坑,对于程序员个人的后续职业发展也是具有很大帮助的!
线程任务、执行器、执行结果这三部分,都可以围绕着Executor线程池展开,因此下面我们将从Executor入手,并且会穿插介绍Callable和Future的相关原理。
Executor线程池的概述
在上面关于Executor执行框架的概述中我们就说过,线程池作为连接线程任务和异步执行结果的执行器,其重要性不言而喻。
在Java框架设计中,一般涉及到资源相关的,并且资源具有创建消耗大、可复用的特点时,都采用了池化技术管理资源,形成一个“资源池”,池化技术可以带来以下一般性好处:对外部隐藏了资源的创建与释放的细节、实现资源的复用减少内存或者时间性能开销。常见Java中池化技术有:数据库连接池(管理数据库连接资源)、基本类型包装类中的缓存池(管理常用包装类对象)、Http连接池(管理http连接资源)、redis连接池(管理redis连接)等等,当然还包括马上要讲的线程池。
Executor线程池来自于JDK1.5的JUC包,使用线程池的目的或者好处如下:
实现线程资源的合理复用。线程资源属于操作系统核心资源之一,创建和销毁都需要占用系统资源和大量时间。使用线程池之后,不再需要开发者管理线程,线程的创建和销毁都交给线程池控制,实现线程的复用,减少线程资源的频繁的创建和销毁。
提升任务执行效率。当新来一个线程任务的时候,由于具有线程复用计数因此可以直接利用现有的线程去执行任务,不需要新建线程,这样一定程度上提升了执行效率。
可以对线程和线程任务实现实时监控和管理。比如目前活动线程数、曾经的最大线程数、已完成的任务数量等功能;比如控制最大线程数,在线程任务执行前-执行完毕后-线程池停止后具有可选的回调方法、移除某个线程任务、立即停止线程池等功能,他们都可以通过线程池的相关方法调用来实现。
JDK的线程池可扩展性极强,我们既可以利用定义好的线程池,也可以自定义线程池,很多其他框架组件也都是使用或者扩展了JDK线程池,比如ForkJoinPool分治框架(线程池框架的增强),guava的线程池MoreExecutors就是基于JDK线程池做的扩展,权限框架Shiro的PropertiesRealm属性文件认证类,JAVA RESTFUL请求服务框架Jersey,甚至单元测试框架junit等等框架都是用到了原生Executor线程池,下面来看看JDK的Executor线程池的主要原理吧!
Executor线程池的基本结构
从JDK1.5到JDK1.8,线程池的成员已经扩展的非常多了,下面仅仅列举了常用的关键的类和接口的uml关系:
下面先简单介绍核心接口以及实现类:
- Executor
作为线程池的顶级执行接口,也是一个函数式接口。只有一个execute方法,用于执行已提交的 Runnable 任务对象。
它不仅仅是一个接口,更是代表着一种将任务与每个任务将如何运行的机制(包括线程的创建、使用、调度等)分离开来的思想。使用者只需要提交任务,不需要创建线程,执行的细节被封装到Executor中,任务的执行方法可以根据实现者自由选择,可以实现为异步(使用新线程执行任务)、也可以是同步的(在调用者的线程中立即运行已提交的任务)。 - ExecutorService
继承并扩展了Executor接口的执行服务接口。
新增了可为跟踪一个或多个异步任务执行状况而生成 Future 的方法,比如submit方法,作为execute方法的扩展。新增了可以关闭线程池的方法,比如shutdown和shutdownNow方法。新增了批量执行任务的方法,比如 invokeAny 和 invokeAll方法。 - AbstractExecutorService
实现了ExecutorService的抽象类,提供 ExecutorService 执行方法的默认实现。比如对ExecutorService返回Future,实现为返回RunnableFuture。另一个作用是作为骨干实现最大限度地减少ExecutorService的实现类的代码。 - ThreadPoolExecutor
继承了ExecutorService的普通类,这是JDK线程池的核心实现。
它的构造器提供了各种可配置参数,比如线程数量、任务队列、拒绝策略等,方便我们自定义自己的线程池,以及各种钩子 (hook) 方法,方便追踪线程任务的执行,这是我们学习的重点,这里不做详细介绍。 - ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor的普通类,可以看作功能的扩展或增强。
它能够将线程任务延迟指定时间后执行,或者间隔固定时间多次执行。功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。Timer中一个任务出现异常之后会影响其他任务的执行,但是ScheduledThreadPoolExecutor不会。Timer中一个任务耗时较常会影响其他任务的执行,ScheduledThreadPoolExecutor不会。 - Executors
独立出来的一个普通类(没有继承和实现关系,采用组合/聚合关系,图上没有注明),作为一个线程池工厂,提供各种实用方法。
提供了各种预定义线程池的实现,比如CachedThreadPool、FixedThreadPool等;提供了将Runnable包装、转换为Callable的方法;提供默认的ThreadFactory线程工厂的实现等功能。
下面我们主要学习ThreadPoolExecutor、ScheduledThreadPoolExecutor、Executors这三个类。另外JDK1.7的时候线程池新增了ForkJoinPool分治框架,这是对线程池的增强,后面的文章我们会讲解ForkJoinPool的源码!
ThreadPoolExecutor 线程池实现
ThreadPoolExecutor的概述
public class ThreadPoolExecutor
extends AbstractExecutorService
JDK线程池的关键实现,我们常用的Executors中的预定义线程池就有这个类的实例,当然也可以通过该类的构造器来创建一个自定义的线程池,提供任务执行,线程调度,线程池管理等等服务,理解了这个类,其他线程池的实现原理就很好理解了。
CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy是四个拒绝策略类。对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,任务会交给RejectedExecutionHandler来处理。
Worker是对线程池中工作线程的包装,用于对于传递的任务或者任务队列中的任务进行执行、中断控制等操作。这其中还涉及到AQS,即Worker实现了简单的不可重入锁。
ThreadPoolExecutor的主要属性
使用一个ctl原子变量来来同时记录线程池的运行状态(runState,简称rs)和线程池中线程数量(workerCount,简称wc)。int类型转换为二进制之后的最高三位保存线程池的状态,低29位保存线程数量。刚初始化ctl的时候,rs为RUNNING状态,wc为0。
另外还具有一个ReentrantLock独占锁,当改变线程池状态,比如添加工作线程、停止线程池,或者访问线程池共享参数信息比如当前线程数量的时候,因为这涉及到多个工作线程之间的共享信息比如线程池状态、工作线程数量等参数的同步,需要获取mainLock独占锁才能进行操作。
其他的属性将会在后面的构造器部分一一详解。
/**
* 用来同时记录线程池的运行状态(runState,简称rs)和线程池中线程数量(workerCount,简称wc)。ctl的值使用AtomicInteger原子类包装,能够保证数据是线程安全的。
* int类型转换为二进制之后的最高三位保存线程池的状态,低29位保存线程数量。刚初始化ctl的时候,rs为RUNNING状态,wc为0
* ReentrantReadWriteLock也是使用一个state变量保存写锁和读锁的获取信息,ConcurrentHashMap中的甚至使用一个lockState保存三种锁状态
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 线程数量掩码位数,int类型长度-3后的剩余位数,即wc所占位数为29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 为了能正确保存线程数量,线程池的数量线程被限制为29位的最大值,即最大(2^29)-1个,而不是(2^31)-1个
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//ThreadPoolExecutor中定义了线程池的状态,存储在ctl的高三位中,一共有五种
/**
* RUNNING状态,11100000000000000000000000000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* SHUTDOWN状态:00000000000000000000000000000000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* STOP状态:00100000000000000000000000000000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* TIDYING状态:01000000000000000000000000000000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* TERMINATED状态:01100000000000000000000000000000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//通过对ctl的拆解、组合,获取相关的数据
/**
* 获取ctl的高3位,线程池运行状态
*
* @param c 此时的ctl值
* @return ctl的高3位的int值
*/
private static int runStateOf(int c) {
//将c的低29位置为0并返回结果
return c & ~CAPACITY;
}
/**
* 获取ctl的低29位,线程数量
*
* @param c 此时的ctl值
* @return ctl的低29位的int值
*/
private static int workerCountOf(int c) {
//将c的高3位置为0并返回结果
return c & CAPACITY;
}
/**
* 组合rs和wc,计算ctl新值
*
* @param rs 运行状态
* @param wc 线程数量
* @return 新ctl的int值
*/
private static int ctlOf(int rs, int wc) {
//两者与运算
return rs | wc;
}
/**
* 线程任务队列,是一个阻塞队列
* 使用isEmpty来检测队列是否为空,而不是通过poll的返回值
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 当改变线程池状态,比如添加工作线程、停止线程池,或者访问线程池共享参数信息比如当前线程数量的时候
* 因为这涉及到多个线程之间的共享信息比如线程池状态、工作线程数量等参数的同步,需要获取mainLock独占锁才行
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 包含全部工作线程的set集合
* 只有在持有mainLock锁的时候才能访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 主要是为了支持awaitTermination方法,外部线程调用awaitTermination方法之后
* 会判断线程池是否是TERMINATED状态,即终止状态,如果不是则调用线程在termination条件变量中等待,直到超时或者线程完毕
*/
private final Condition termination = mainLock.newCondition();
/**
* 记录到目前为止线程池中的拥有的最大线程数量
* 只有在持有mainLock锁的时候才能访问
*/
private int largestPoolSize;
/**
* 线程池已完成的任务数量,只有在某个Worker工作线程终止时才会更新
* 只有在持有mainLock锁的时候才能访问
*/
private long completedTaskCount;
/**
* ThreadPoolExecutor中的工作线程统一使用线程工厂来创建
* threadFactory用于保存传入的线程工厂实现,具有volatile的特性
* Executors中给出了默认实现,我们可以直接使用:Executors.defaultThreadFactory()
*/
private volatile ThreadFactory threadFactory;
/**
* 对于提交任务数超过maxmumPoolSize+workQueue之和时超出的任务,或者线程池正在关闭时的新提交的任务执行的拒绝策略,
* 任务会交给RejectedExecutionHandler的handler来处理,具有volatile的特性
* ThreadPoolExecutor中提供了默认实现:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲的工作线程的等待超时时间,具有volatile的特性
* 当存在的工作线程数量大于指定核心线程数量时,那么多余的线程会使用此超时时间,超过该时间没有工作则关闭线程
* 或者如果允许CoreThreadTimeOut,那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心将永远等待新的任务。
*/
private volatile long keepAliveTime;
/**
* 是否允许核心线程应用空闲超时时间,具有volatile的特性
* 如果为false,那么即使核心线程空闲也会永远保持活动状态(不会被销毁)
* 如果为true,那么核心线程将会应用 keepAliveTime,在指定时间内等待工作,超时则被销毁(设置成功的要求是超时时间大于0)。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池核心线程数量,具有volatile的特性
* 除非设置了allowCoreThreadTimeOut=true,那么核心线程永远不会被销毁
*/
private volatile int corePoolSize;
/**
* 线程池最大线程数量,具有volatile的特性
* 不能超过(2^29)-1
*/
private volatile int maximumPoolSize;
//下面的属性涉及到Java安全模型:https://developer.ibm.com/zh/articles/j-lo-javasecurity/
//一般人接触不到
/**
* 用于校验是否具有shutdown 和 shutdownNow 关闭线程池的操作权限
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
* 授权上下文环境对象
* 在GC标记ThreadPoolExecutor对象并调用finalize方法时调用,用于释放资源
*/
private final AccessControlContext acc;
ctl相关方法
围绕着ctl有一系列的简单的方法,它们被其他大量方法所调用,这里先介绍出来!
runStateOf(int c):获取ctl的高3位,即获取线程池运行状态值;
workerCountOf(int c):获取ctl的低29位,即获取线程数量值;
ctlOf(int rs, int wc):组合rs和wc,计算ctl新值;
runStateLessThan(int c, int s):c的运行状态值是否小于指定状态值s;
runStateAtLeast(int c, int s):c的运行状态值是否大于等于指定状态值s;
isRunning(int c):c的运行状态是否是RUNNING;
compareAndIncrementWorkerCount(int expect):尝试CAS的将ctl的WorkerCount线程数量部分自增1;
compareAndDecrementWorkerCount(int expect):尝试CAS的将ctl的WorkerCount线程数量部分自减1;
decrementWorkerCount():循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。只有在addWorkerFailed、processWorkerExit以及getTask方法中调用。
//通过对ctl的拆解、组合,获取相关的数据
/**
* 获取ctl的高3位,线程池运行状态
*
* @param c 此时的ctl值
* @return ctl的高3位的int值
*/
private static int runStateOf(int c) {
//将c的低29位置为0并返回结果
return c & ~CAPACITY;
}
/**
* 获取ctl的低29位,线程数量
*
* @param c 此时的ctl值
* @return ctl的低29位的int值
*/
private static int workerCountOf(int c) {
//将c的高3位置为0并返回结果
return c & CAPACITY;
}
/**
* 组合rs和wc,计算ctl新值
*
* @param rs 运行状态
* @param wc 线程数量
* @return 新ctl的int值
*/
private static int ctlOf(int rs, int wc) {
//两者与运算
return rs | wc;
}
//不需要拆解ctl进行数据比较或者更改
/**
* 运行状态值是否小于指定状态值
*
* @param c 此时的ctl
* @param s 指定状态值
* @return true 是 false 否
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/**
* 运行状态值是否大于等于指定状态值
*
* @param c 此时的ctl
* @param s 指定状态值
* @return true 是 false 否
*/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 运行状态是否是RUNNING
*
* @param c 此时的ctl
* @return true 是 false 否
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 尝试CAS的将ctl的WorkerCount线程数量部分自增1
*
* @param expect 预期的ctl值
* @return true 成功 false 失败
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 尝试CAS的将ctl的WorkerCount线程数量部分自减1
*
* @param expect 预期的ctl值
* @return true 成功 false 失败
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止
* 只有在addWorkerFailed、processWorkerExit以及getTask部分调用
*/
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
线程池的状态
在上面的属性介绍中,我们知道线程池有5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
RUNNING = -1 << COUNT_BITS,转换为二进制就是11100000000000000000000000000000
SHUTDOWN = 0 << COUNT_BITS,转换为二进制就是00000000000000000000000000000000
STOP = 1 << COUNT_BITS,转换为二进制就是00100000000000000000000000000000
TIDYING = 2 << COUNT_BITS,转换为二进制就是01000000000000000000000000000000
TERMINATED = 3 << COUNT_BITS,转换为二进制就是01100000000000000000000000000000
可以发现,运行状态的大小关系为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,这在状态转换的时候非常有用,这样可以通过大小判断状态关系。
类似于线程的状态,线程池的状态也可以转换。但是又有不同,线程状态可以循环转换、相互转换,而一旦发生线程池的状态的转换,那么该转换不可逆。下面来看看线程池状态的转换规则:
详细说明:
状态名 | 说明 | 转换 |
---|---|---|
RUNNING | 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 | 新建的线程池的初始状态就是RUNNING,并且线程池中的工作线程数量为0。 |
SHUTDOWN | 线程池处在SHUTDOWN状态时,不接收新任务,但内部正在执行的任务和队列里等待的任务,会执行完,随后会清理全部工作线程。 | RUNNING状态的线程池,调用shutdown方法,或者隐式调用了finalize方法(里面有shutdown方法时线程池状态将变成SHUTDOWN。 |
STOP | 线程池处在STOP状态时,不接收新任务,不处理已添加的任务(丢弃),并且会中断正在处理的任务,随后会清理全部工作线程。 | RUNNING or SHUTDOWN状态的线程池,调用shutdownNow方法,线程池状态将变成 STOP。 |
TIDYING | 所有的任务已执行完或被终止或被丢弃,ctl记录的workerCount工作线程数量为0,线程池会变为TIDYING状态。接着会执行钩子函数terminated()。 | SHUTDOWN状态的线程池,当任务队列为空并且线程池工作线程数workerCount为0时,线程池状态就会由 SHUTDOWN 自动转换为 TIDYING状态。STOP状态的线程池,线程池中工作线程数workerCount为0时,线程池状态就会由STOP自动转换为TIDYING状态 |
TERMINATED | 钩子函数terminated()执行完毕,就变成TERMINATED状态,线程池彻底终止。 | TIDYING状态的线程池,在接着执行完terminated()之后,线程池状态就会由TIDYING自动转换为 TERMINATED。 |
ThreadPoolExecutor的构造器
ThreadPoolExecutor的构造器是创建线程池的入口,JDK提供了四个构造函数。其中参数较少的的三个构造函数内部都是调用参数最多的那一个构造函数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue< Runnable > workQueue)
使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
//内部调用最多参数的构造器
//线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
//拒绝策略传递的默认实现:defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,ThreadFactory threadFactory)
使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
//内部调用最多参数的构造器
//拒绝策略传递的默认实现:defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,RejectedExecutionHandler handler)
使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
//内部调用最多参数的构造器
//线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
最后一个构造器,使用给定的初始参数创建新的 ThreadPoolExecutor,一共有7个参数。
/**
1. 使用给定的初始参数创建新的 ThreadPoolExecutor。
2. 3. @param corePoolSize 核心线程数
3. @param maximumPoolSize 最大线程数
4. @param keepAliveTime 空闲线程等待超时时间
5. @param unit 超时时间单位
6. @param workQueue 阻塞任务队列
7. @param threadFactory 线程工厂
8. @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//一系列参数校验
/*
* 如果核心线程数小于0
* 或者 如果最大线程数小于等于0
* 或者 如果最大线程数小于核心线程数
* 或者 如果空闲线程等待超时时间小于0
*
* 满足上面一项,都将抛出IllegalArgumentException异常
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
/*
* 如果阻塞任务队列为null
* 或者 如果线程工厂为null
* 或者 如果拒绝策略为null
*
* 满足上面一项,都将抛出NullPointerException异常
*/
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//初始化用于安全管理器的上下文参数
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//初始化核心线程数
this.corePoolSize = corePoolSize;
//初始化最大线程数
this.maximumPoolSize = maximumPoolSize;
//初始化阻塞任务队列
this.workQueue = workQueue;
//初始化空闲线程等待超时时间
this.keepAliveTime = unit.toNanos(keepAliveTime);
//初始化线程工厂
this.threadFactory = threadFactory;
//初始化拒绝策略
this.handler = handler;
}
单从构造器来说,实际上很简单,首先对参数进行校验,随后对一些全局属性进行初始化。下面我们主要来解释构造函数中的参数的含义,首先简单的认识这些属性:
9. corePoolSize:线程池核心线程数。不能小于0。
- 当提交一个任务到线程池时,如果此时线程池的线程数量小于核心线程数,那么线程池会新创建一个线程来执行任务,即使此时存在空闲线程也不例外。默认情况创建0个核心线程,如果调用了线程池的prestartAllCoreThreads()方法,线程池会立即创建并启动所有核心线程。
- maximumPoolSize:线程池最大线程数。不能小于corePoolSize,不能小于等于0。
- 当workQueue(任务队列)放不下线程任务,并且已创建的线程数小于最大线程数,则线程池会再次创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列(任务队列没有上限大小)这个参数就没什么效果。
- keepAliveTime:空闲线程等待超时时间;unit:keepAliveTime时间单位
- 当线程数量超过corePoolSize时,多余的空闲线程等待超时时间,即如果指定时间返回没有任务执行,那么该线程将被回收,直到数量减少到corePoolSize为止。
- 如果允许CoreThreadTimeOut(前提是keepAliveTime大于0),那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心线程将永远等待新的任务。
- workQueue:阻塞任务队列。
- 当线程任务添加的速度超过所有核心线程执行速度时,新来的来不及执行的线程任务将被存放到workQueue阻塞任务队列中。
- 任务队列一定是阻塞队列,常见的有以下四种,实际上有很多种:
- ArrayBlockingQueue:有界阻塞任务队列,构造函数一定要传入具体队列容量。
- LinkedBlockingQueu:通常作为无界阻塞任务队列(构造函数不传大小会默认为Integer.MAX_VALUE ),当有大量任务提交时,容易造成内存耗尽。
- SynchronousQueue:一个没有容量的阻塞队列,会将任务同步交付给工作线程。
- PriorityBlockingQueue : 具有优先级的无界阻塞任务队列。
- threadFactor:线程工厂。
- 线程工厂用于创建工作线程,默认线程工厂:Executors.defaultThreadFactory。
- handler:拒绝策略。
- 对于正在执行的线程数等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
- ThreadPoolExecutor中提供了4种实现:
- AbortPolicy:调用者的线程直接抛出异常,作为默认拒绝策略;
- CallerRunsPolicy:用调用者的线程执行任务;
- DiscardOldestPolicy:抛弃队列中最久的任务;
- DiscardPolicy:抛弃当前任务;
corePoolSize、workQueue,maximumPoolSize ,keepAliveTime,unit之间关系
默认情况下,新的线程池不会启动任何线程。线程数量小于corePoolSize时,新提交任务都将通过线程工厂创建一个新线程去执行,即使此时线程池中存在空闲的线程。
当创建的线程数量达到corePoolSize时,新提交的任务会判断如果有空闲的线程那么就让空闲线程去执行,没有空闲线程时新提交的任务将被放入workQueue中,当有线程执行完当前任务,就会从任务队列中拉取任务继续执行。
当workQueue已满,并且maximumPoolSize>corePoolSize时,此时新提交任务又会通过线程工厂创建新线程去执行。
对于线程数大于等于maxmumPoolSize以及workQueue容量已满时新提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
如果线程池中存在超过corePoolSize的线程数并且存在空闲线程。如果空闲线程在keepAliveTime(unit表示时间单位)时间范围内都没有工作时,将清理空闲线程,减少资源浪费,直到线程数量被清理减少至核心线程数为止,预留一定数量的核心资源。而通过调用allowCoreThreadTimeOut(true)方法(设置成功的要求是超时时间大于0),核心线程也将应用超时时间规则,即此时如果没有新任务,那么所有的线程都将可能被清理。
corePoolSize、maximumPoolSize、keepAliveTime+unit、ThreadFactory、RejectedExecutionHandler都可以在线程池启动(创建)之后动态调整!通过这些参数,线程池可以动态的调整线程数量,我们也可以创建属于自己的特殊的线程池。
unit表示keepAliveTime的时间单位,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
ThreadFactory 线程工厂
线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从new Thread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
也可以通过实现ThreadFactory自定义线程工厂,案例如下:
public class ThreadPool {
private static ExecutorService pool;
public static void main(String[] args) {
//自定义线程池,最大线程数为3
pool = new ThreadPoolExecutor(3, 3, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
//自定义线程工厂,lambda
r -> {
System.out.println("线程" + r.hashCode() + "创建");
//线程命名
return new Thread(r, "threadPool-" + r.hashCode());
}, new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
pool.execute(new ThreadTask());
}
pool.shutdown();
}
}
class ThreadTask implements Runnable {
@Override
public void run() {
//输出执行线程的名称
System.out.println("ThreadName:" + Thread.currentThread().getName());
}
}
上面是通过匿名内部类的lambda形式创建ThreadFactory实例的,其他框架也提供了多种现成的方式:
- spring自带的的CustomizableThreadFactory。可以通过构造器和方法设置参数。
- guava的ThreadFactoryBuilder。可以通过方法的链式调用设置参数。
- commons-lang3的BasicThreadFactory。可以通过方法的链式调用设置参数
workQueue任务队列
任务队列用于存放提交的来不及执行的任务,一定是一个阻塞队列BlockingQueue,JDK自带的阻塞队列如下:
类名 | 队列名 |
---|---|
ArrayBlockingQueue | 有界阻塞队列 |
LinkedBlockingQueue | 无界阻塞队列 |
LinkedBlockingDeque | 无界阻塞双端队列 |
SynchronousQueue | 没有容量的阻塞队列 |
DelayQueue | 支持延时操作的无界阻塞队列 |
PriorityBlockingQueue | 任务具有优先级的无界阻塞队列 |
LinkedTransferQueue | JDK1.7的新无界阻塞队列 |
根据常用阻塞队列的类型,任务队列一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列,当然还有其他的类型。
直接提交队列
阻塞队列设置为SynchronousQueue。前面讲过,SynchronousQueue是一个特殊的BlockingQueue,它内部没有容量,每一个入队操作都需要匹配一个等待的出队操作或者等待被后来的出队操作匹配才能返回,出队操作同理。JUC—三万字的SynchronousQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用SynchronousQueue队列,新任务不会被保存在任务队列中,总是会马上被执行。如果此时线程数量小于maximumPoolSize,则尝试创建新的线程执行;如果达到maximumPoolSize设置的最大值,则传递给执行拒绝策略去执行。
采用SynchronousQueue作为任务队列的线程池不能缓存任务,一个任务要么被执行要么被拒绝策略处理,这就是“直接提交”的由来。
public class SynchronousQueueThreadPool {
public static void main(String[] args) {
//maximumPoolSize设置为2,拒绝策略为AbortPolicy策略,直接抛出异常
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
//采用SynchronousQueue作为任务队列,不能缓存任务
new SynchronousQueue<>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//核心线程应用超时等待机制
pool.allowCoreThreadTimeOut(true);
//提交三个任务,明显会超出最大线程数量
for (int i = 0; i < 3; i++) {
pool.execute(new SynchronousQueueThreadTask());
}
}
}
class SynchronousQueueThreadTask implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
结果:
pool-1-thread-2
pool-1-thread-1
Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task com.thread.test.threadpool.ThreadTaskSyn@2f0e140b rejected from java.util.concurrent.ThreadPoolExecutor@7440e464[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.thread.test.threadpool.SynchronousQueueThreadPool.main(SynchronousQueueThreadPool.java:17)
有界任务队列
阻塞队列设置为ArrayBlockingQueue。前面讲过,ArrayBlockingQueue是一个特殊的BlockingQueue,它必须设置容量,作为有界任务队列。JUC—ArrayBlockingQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用ArrayBlockingQueue队列,则会将新的任务加入到等待队列中,如果达到队列的容量,则继续添加的任务会被处理。如果此时线程数量小于maximumPoolSize,则尝试创建新的线程执行;如果达到maximumPoolSize设置的最大值,则传递给执行拒绝策略去执行。
采用ArrayBlockingQueue作为任务队列的线程池可以缓存任务,是较为常见的任务队列。
无界任务队列
阻塞队列设置为LinkedBlockingQueue。前面讲过,LinkedBlockingQueue是一个特殊的BlockingQueue,它虽然可以设置容量,但是不设置容量就作为无界任务队列。JUC—LinkedBlockingQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用LinkedBlockingQueue队列,并且没有指定容量,那么可以无限制的保存添加的新任务(实际上最大容量为 Integer.MAX_VALUE,但是基本上达不到就会抛出内存溢出异常)。
采用LinkedBlockingQueue作为任务队列的线程池可以无限缓存任务,这时,设置maximumPoolSize参数是无效的,当线程池的线程数达到corePoolSize后就不会再增加了,此时需要注意内存资源耗尽的问题。
另外,由于LinkedBlockingQueue也可以设置容量,因此也可以作为有界任务队列,并且由于它采用了两把锁,性能好于采用一把锁的ArrayBlockingQueue。
优先任务队列
阻塞队列设置为PriorityBlockingQueue。前面讲过,PriorityBlockingQueue是一个特殊的BlockingQueue,它可以为任务设置优先级,优先最高的任务将会先出队列被执行,它要求任务可比较大小。JUC—两万字的PriorityBlockingQueue源码深度解析。
使用PriorityBlockingQueue队列,通常设置corePoolSize为0,这样新来的任务将会直接进入PriorityBlockingQueue,如果没有指定容量,那么可以无限制的保存添加的新任务(由于底层是数组,实际上最大容量为 Integer.MAX_VALUE,但是基本上达不到就会抛出内存溢出异常)。
采用PriorityBlockingQueue作为任务队列的线程池可以无限缓存任务,如果没有设置容量,maximumPoolSize参数是无效的,当线程池的线程数达到corePoolSize后就不会再增加了,此时需要注意内存资源耗尽的问题。
public class PriorityBlockingQueueThreadPool {
public static void main(String[] args) {
//一个线程池,使用优先级的PriorityBlockingQueue阻塞队列作为任务队列
ExecutorService pool = new ThreadPoolExecutor(0, 1, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
/*
* 循环从 priority=0开始添加任务,即最先添加的任务优先级最低
* 查看输出执行情况,可以发现最后添加的任务(优先级最高)最先被执行
*/
for (int i = 0; i < 20; i++) {
pool.execute(new ThreadTask(i));
}
pool.shutdown();
}
/**
* 任务实现,同时作为Comparable的实现类
*/
static class ThreadTask implements Runnable, Comparable<ThreadTask> {
private int priority;
ThreadTask(int priority) {
this.priority = priority;
}
/**
* 当前对象和其他对象做比较,当前priority大就返回-1,当前priority小就返回1,
*
* @param o 被比较的线程任务
* @return 返回-1就表示当前任务出队列优先级更高;返回1就表示当前任务出队列优先级更低;即priority值越大,出队列的优先级越高;
*/
@Override
public int compareTo(ThreadTask o) {
return this.priority > o.priority ? -1 : 1;
}
@Override
public void run() {
System.out.println("priority:" + this.priority + ",ThreadName:" + Thread.currentThread().getName());
}
}
}
RejectedExecutionHandler拒绝策略
对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
JDK内置的拒绝策略实现有4种,都位于ThreadPoolExecutor内部作为内部类。需要注意的是拒绝策略是由调用execute或者submit方法的线程去执行的,而不是线程池的线程去执行。
AbortPolicy 抛出异常
调用线程直接抛出异常,阻止系统正常运行。线程池的默认策略。
/**
* 默认的拒绝策略,可以看到就是AbortPolicy策略
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/**
* 调用者抛出RejectedExecutionException异常的拒绝策略
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() {
}
/**
* 总是抛出RejectedExecutionException.
*
* @param r 任务
* @param e 线程池
* @throws RejectedExecutionException 异常
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy调用者执行
只要线程池未关闭,该策略就直接在调用者线程中,运行当前被丢弃的任务,如果线程池关闭,任务将被静默丢弃。需要注意可能会阻塞main线程(调用线程)
/**
* 只要线程池未关闭,该策略就直接在调用者线程中,运行当前被丢弃的任务,
* 如果线程池关闭(SHUTDOWN以及之后的状态),任务将被静默丢弃。
* 需要注意可能会阻塞main线程
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() {
}
/**
* 在调用方的线程中执行任务 r,除非执行器已关闭(SHUTDOWN以及之后的状态),在这种情况下,任务被丢弃。
*
* @param r 任务
* @param e 线程池
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//判断线程池是否没有被关闭
if (!e.isShutdown()) {
//直接通过调用线程运行run方法
r.run();
}
}
}
案例:
/**
* @author lx
*/
public class CallerRunsPolicyTest {
public static void main(String[] args) {
// 创建线程池。和新线程和最大线程数都为1,
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
//采用直接提交任务队列,不能缓存任务
new SynchronousQueue<>(),
//拒绝策略为"CallerRunsPolicy"
new ThreadPoolExecutor.CallerRunsPolicy());
//新建10个任务,并将它们添加到线程池中。
//从结果可以看到,部分任务走了拒绝策略:由main线程(调用线程)运行,因此,需要一定要注意注意可能会阻塞main线程(调用线程)
for (int i = 0; i < 10; i++) {
Runnable myrun = new ThreadTask("task-" + i);
pool.execute(myrun);
}
System.out.println("------main线程执行到这里");
// 关闭线程池
pool.shutdown();
}
static class ThreadTask implements Runnable {
private String name;
public ThreadTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(this.name + " is running." + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
某次结果:
task-1 is running.main
task-0 is running.pool-1-thread-1
task-2 is running.main
task-3 is running.pool-1-thread-1
task-4 is running.main
task-5 is running.pool-1-thread-1
task-6 is running.main
task-7 is running.pool-1-thread-1
task-8 is running.main
------main线程执行到这里
task-9 is running.pool-1-thread-1
DiscardOldestPolicy丢弃最老任务
如果线程池被关闭,那么直接丢弃任务。否则丢弃队列中最老的一个任务,也就是即将被执行的一个任务,并尝试再次提交当前任务。这种拒绝策略也有可能阻塞调用线程(队列一直是满的)。
/**
* 如果线程池被关闭,那么直接丢弃任务。
* 否则丢弃队列中最老的一个任务,也就是即将被执行的一个任务,并尝试再次提交当前任务
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() {
}
/**
* 如果线程池被关闭,那么直接丢弃任务。
* 否则丢弃队列中最老的一个任务,也就是即将被执行的一个任务,并尝试再次提交当前任务
*
* @param r 任务
* @param e 线程池
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//判断线程池是否没有被关闭
if (!e.isShutdown()) {
//将队头任务移除队列丢弃
e.getQueue().poll();
//尝试再次提交任务
e.execute(r);
}
}
}
DiscardPolicy丢弃任务
丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
/**
* 丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {
}
/**
* 丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
*
* @param r 任务
* @param e 线程池
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//什么也不做
}
}
自定义拒绝策略
以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。这是一种设计模式—策略模式!
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
execute核心提交方法
public void execute(Runnable command)
传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出RejectedExecutionException;如果任务为null,那么抛出NullPointerException。
execute方法是线程池的绝对核心方法,很多其他内部方法都是为该方法服务的,涉及到的流程和代码非常复杂。
execute方法的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 首先就是command任务的null校验,如果为null直接抛出NullPointerException;
- 调用workerCountOf计算运行线程数量,如果小于corePoolSize,即目前在行线程数小于核心线程数:
- 调用addWorker方法尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限。如果任务提交给新线程成功,那么直接返回true。
- 到这一步,肯定是addWorker方法返回false。表示可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。
- 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。判断线程池是否是RUNNING状态,以及尝试加入任务队列。如果线程池还是RUNNING状态并且成功加入任务队列:
- 再次检查线程池是否不是RUNNING状态,以及尝试将任务移除任务队列。若果线程池不是RUNNING状态,并且将任务移除任务队列成功,那么对任务执行拒绝策略。
- 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。判断当前工作线程数是否为0,如果是,尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,这一步为了维持线程池的活性。 因为有可能在新任务offer加入队列的时候,其他工作线程都因为超时或者SHOTDOWN而被清理,此时仍然需要新开线程对加入进来的任务进行完成。
- 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了。尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限。如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize,那么执行拒绝策略。
上面的判断都没有加锁,因此状态可能都是瞬时性的判断,不保证一直有效。
/**
* 传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。
* <p>
* 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException;
* 如果任务为null,那么抛出NullPointerException。
*
* @param command 要执行的任务
* @throws RejectedExecutionException 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException
* @throws NullPointerException 如果任务为null
*/
public void execute(Runnable command) {
//首先就是command任务的null校验,如果为null直接抛出NullPointerException
if (command == null)
throw new NullPointerException();
//获取ctl的值c
int c = ctl.get();
/*1 调用workerCountOf计算线程数量,如果小于corePoolSize*/
if (workerCountOf(c) < corePoolSize) {
//尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限
if (addWorker(command, true))
return;
//到这里还没有返回,那么表示addWorker失败。
//可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等情况
//重新获取ctl的值c
c = ctl.get();
}
/*
* 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,
* 或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。
*
* 2 如果线程池还是RUNNING状态,并且成功加入任务队列
*/
if (isRunning(c) && workQueue.offer(command)) {
//获取ctl的值c
int recheck = ctl.get();
/*再次检查,如果线程池不是RUNNING状态,并且将任务移除队列成功*/
if (!isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
/*
* 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。
* 判断如果当前工作线程数为0
*/
else if (workerCountOf(recheck) == 0)
//尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,无论成功还是失败
//这一步为了维持线程池的活性。
addWorker(null, false);
}
/*
* 3 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了
*
* 尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限
* 如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize
*/
else if (!addWorker(command, false))
//执行拒绝策略
reject(command);
}
/**
1. 该方法用于执行拒绝策略
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
addWorker尝试添加新线程
addWorker尝试新建一个工作线程(Worker)并启动去执行任务,同样作为线程池的核心方法。大概步骤为:检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。
addWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
开启一个死循环,相当于自旋,使用retry标记。检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环。
获取此时最新的ctl值c,通过c获取此时线程池的状态rs。
线程池状态校验:如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行状态,并且(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空)这三个条件有一个成立,那么直接返回false,adderWorker失败。即只有RUNNING状态,或者SHUTDOWN状态并且不是新添加任务的请求并且任务队列不为空,满足这两种情况的一种,才进行下一步;
到这一步,说明通过判断线程池状态校验通过。内部再开启一个死循环:首先校验线程数量,如果不符合规则,那么直接返回false。随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步。失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可。
获取此时最新的线程数量wc。
线程数量校验:如果wc大于等于CAPACITY(最大线程数),或者wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)。满足两种条件的一个,那么直接返回false,表示线程数量不符合要求。
到这一步,表示满足线程数量要求。那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1,CAS操作成功之后,直接break retry跳出外层循环,这一步就算完成了,进入下一步尝试新增Worker。
到这一步返回跳出循环,表示CAS失败,重新获取最新的ctl值c。
继续判断如果线程池运行状态不等于rs,线程池的状态改变了,我们知道线程池的状态是不可逆的,那么continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出。
到这一步还没跳出循环,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可。
到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker并启动线程的逻辑。
workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动。workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入。w表示需要新增的Worker对象,初始化为null。
开启一个try代码块:
新建一个Worker赋给w,传入firstTask参数,作为将要执行的第一个任务,在构造器中还会通过线程工厂初始化一个新线程。
获取w内部的新线程t。如果t不为null:
下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要获取mainLock锁,成功之后进行下一步并开启一个try代码块:
获取此时的线程池运行状态值rs。
首先进行的是再次的检查线程池状态,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止。,如果rs小于SHUTDOWN,即属于RUNNING状态,或者rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)满足这两个条件中的一个,才可以真正的开启线程:
继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法。那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常。
上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法。
获取workers的数量s,就表示目前工作线程数量,如果s大于largestPoolSize,即大于历史最大线程数量,那么largestPoolSize更新为s。
workerAdded置为true,表示新增的工作线程已加入workers集合。
最终,无论上面的代码是成功了还是发生了异常,都需要在finally中解锁mainLock。
解锁成功之后的代码。如果workerAdded为true,即新增的工作线程已加入workers集合,说明操作完全没问题,一切正常:
那么启动线程t,线程t将会执行Worker中的run方法。workerStarted置为true,表示新增的工作线程已启动。
最终,无论上面的try代码是成功了还是发生了异常,都走finally语句:
如果workerStarted为false,即新增的工作线程最终没能启动成功,那么调用addWorkerFailed对此前可能做出的改变进行“回滚”。否则finally中什么也不做。
返回新增的工作线程已启动的标志workerStarted,如果已启动线程,表示新增工作线程成功,否则表示新增工作线程失败。
/**
1. 检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,
2. 首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。
3. 如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。
4. 5. @param firstTask 新线程应首先运行的任务,如果没有,则为null,将会从队列中获取任务
6. 当线程数量少于corePoolSize时,总会启动一个线程来执行新添加的任务,或者当队列已满时,同样可能需要新启动要给线程来执行新添加的任务
7. @param core 如果为true,那么使用corePoolSize作为线程数量边界大小,否则使用maximumPoolSize作为线程数量边界大小
8. 这里不是用具体的参数值而是boolean类型,是因为,比较大小的时候需要随时获取最新值
9. @return 添加Worker成功,返回true;否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
/*
* 1 开启一个死循环,相当于自旋
* 检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,
* 如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环
*/
retry:
for (; ; ) {
//获取此时的ctl值
int c = ctl.get();
//通过ctl获取此时线程池的状态rs
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行态
* 并且 !(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空),即这三个中至少有一个不成立
* 那么直接返回false,adderWorker失败
*
* 分析一下:
* 1 首先是要线程状态不是RUNNING运行态了,那么就是后面的状态比如SHUTDOWN、STOP等,此时才有可能直接返回,运行态是不会在这里返回的
* 2 随后,如果线程状态rs等于SHUTDOWN,那么不一定返回,因为此时任务队列可能存在任务,此时可以新增线程去执行;如果大于SHUTDOWN,那么一定返回,即所有放弃任务
* 继续,如果firstTask为null,那么不一定返回,因为此时可能存在新添加到队列的任务,需要去执行;如果不为null,那么一定是新添加的任务,那么不接受该任务,直接返回
* 继续,如果workQueue.isEmpty()返回false,那么不一定返回,因为此时可能存在新添加到队列的任务,需要去执行;如果为空,那么同样直接返回,因为线程池停止且没有任务可执行了
*
* 即,如果非RUNNING状态,并且是SHUTDOWN状态且firstTask为null(不是新添的任务)并且任务队列不为空
* 那么对于SHUTDOWN状态,此时需要线程去执行已经添加到任务队列中的任务,因此不会在这里返回;如果是其他状态,那么直接返回false即可
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
/*
* 到这一步,说明通过判断线程状态不需要返回,但是这里的判断没有加锁,因此这是不一定的,继续校验
*
* 内部再开启一个循环:
* 首先校验线程数量,如果不符合规则,那么直接返回false
* 随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步
* 失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验
* 如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可
*/
for (; ; ) {
//获取此时的线程数量wc
int wc = workerCountOf(c);
/*
* 如果wc大于等于CAPACITY最大线程数
* 或者 wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)
* 那么直接返回false,表示线程数量不符合要求
*
* 分析一下:
* 1 如果线程数量大于等于最大线程数量,那么肯定直接返回false,这表示线程数量达到了最大值,不能够添加新线程了
* 2 如果线程数量大于等于边界线程数量,那么同样直接返回false,这表示不符合最初调用addWorker的情景条件,不能够添加新线程了
* 在调用addWorker方法新增线程之前,如果线程数小于corePoolSize,那么core参数为true;如果如果线程数大于等于corePoolSize,那么core参数为false
* 如果core参数为true,通常希望在新增线程之后线程数要小于等于corePoolSize;如果core参数为false,那么要求在新增线程之后线程数要小于等于maximumPoolSize
* 用于将两种情况区分开来,同时在其他地方,比如核心线程预启动方法中非常有用
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//到这一步,表示满足线程数量要求
//那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1
if (compareAndIncrementWorkerCount(c))
//CAS操作成功之后,直接break retry跳出外层循环,进入下一步,新增Worker
break retry;
//到这里,表示CAS失败,重新获取最新的ctl值c
c = ctl.get(); // Re-read ctl
//如果线程池运行状态不等于rs,线程池的状态改变了,我们直到线程池的状态是不可逆的,那么
//continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//到这里,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可
}
}
//到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker的逻辑
//workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动
boolean workerStarted = false;
//workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入
boolean workerAdded = false;
//w表示需要新增的Worker对象,初始化为null
Worker w = null;
try {
//新建一个Worker,传入firstTask参数,作为将要执行的第一个任务
//在构造器中还会通过线程工厂初始化一个新线程
w = new Worker(firstTask);
//获取w内部的新线程t
final Thread t = w.thread;
//如果t不为null
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要加锁
//获取mainLock锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取锁之后首先进行的是再次的检查,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止
//获取此时的线程池运行状态值rs
int rs = runStateOf(ctl.get());
/*
* 如果rs小于SHUTDOWN,即属于RUNNING状态
* 或者 rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)
*
* 满足这两个条件中的一个,才可以真正的开启线程
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
/*
* 继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法
* 那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常
* 这里也要求线程工厂返回的线程,仅仅是一个Thread对象即可,不能够start启动而帮倒忙!
*/
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法
workers.add(w);
//获取workers的数量s,就表示目前工作线程数量
int s = workers.size();
//如果s大于largestPoolSize,即大于历史最大线程数量
if (s > largestPoolSize)
//那么largestPoolSize更新为s
largestPoolSize = s;
//workerAdded置为true,表示新增的工作线程已加入workers集合
workerAdded = true;
}
} finally {
//最终,无论上面的代码是成功了还是发生了异常,都需要解锁
mainLock.unlock();
}
//解锁成功之后,如果新增的工作线程已加入workers集合,说明操作完全没问题,一切正常
if (workerAdded) {
//那么启动线程t,这是线程t将会执行Worker中的run方法
t.start();
//workerStarted置为true,表示新增的工作线程已启动
workerStarted = true;
}
}
} finally {
//最终,无论上面的代码是成功了还是发生了异常,都走finally语句块
//如果workerStarted为false,即新增的工作线程最终没能启动成功
if (!workerStarted)
//那么调用addWorkerFailed对此前可能做出的改变进行“回滚”
addWorkerFailed(w);
}
//返回新增的工作线程已启动的标志,如果已启动,表示新增工作线程成功,否则表示新增工作线程失败
return workerStarted;
}
addWorkerFailed添加Worker失败处理
addWorkerFailed方法仅仅在addWorker方法中被调用,表示添加Worker失败时的回滚操作,传递的参数就是代表新增Worker的w变量。
addWorkerFailed的详细步骤:
获取mainLock锁;
如果w不为null,即Worker被添加到了workers中,那么从workers中移除w;
调用decrementWorkerCount,循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
此时线程池可能不是RUNNING状态,那么调用tryTerminate方法尝试将线程池状态设置为TERMINATED彻底结束线程池。
最终解锁。
/**
1. 仅仅在addWorker方法中被调用,处理添加Worker失败时的回滚:
2. 1 如果Worker被添加到了workers中,那么移除Worker
3. 2 workerCount自减1
4. 3 此时线程池可能不是RUNNING或者SHUTDOWN状态,那么尝试将线程池状态设置为TERMINATED彻底结束线程池
5. 6. @param w 新建的Worker
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//如果w不为null
if (w != null)
//从workers中移除w,这是BlockingQueue的方法
workers.remove(w);
//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
decrementWorkerCount();
//此时线程池可能不是RUNNING状态,那么尝试将线程池状态设置为TERMINATED彻底结束线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
tryTerminate尝试终止线程池
tryTerminate方法在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用,这几个方法被调用时,一般都是涉及到线程池状态改变、移除workers集合中的线程、移除任务队列中的任务的情况,该方法用于尝试终止线程池(将线程池状态将尝试转换为TERMINATED)。
tryTerminate的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
开启一个死循环,尝试终止线程池:
获取此时最新的ctl值c;
线程池状态校验:如果处于RUNNING状态,不能终止线程池;或者如果运行状态值大于等于TIDYING,不需要该请求去终止线程池;或者如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池。以上三个条件满足一种,即可立即返回。
到这里,表示:线程池位于STOP状态,或者线程池位于SHUTDOWN状态且workQueue任务队列为空,上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断。
如果workerCount线程计数不为0,那么不能终止线程池。此这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断;当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断。
执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要调用interruptIdleWorkers (ONLY_ONE)尝试对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们都被中断并移除。随后返回。
到这里表示工作线程数为0,那么表示可以尝试更改线程池状态,进一步停止线程池。首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程。
开启一个try代码块:
尝试将ctl的值CAS的从c更改为TIDYING状态的值,即转换为TERMINATED状态。如果CAS成功:
在另一个try块中执行terminated()钩子方法,该方法是空实现,可以由子类重写。
无论terminated是否抛出异常,最终都执行finally块:
将ctl的值CAS的从TIDYING更改为TERMINATED状态的值,即转换为TERMINATED状态,表示线程池被彻底停止。
唤醒所有调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭。
方法返回。
最终需要在finally块中解锁。
到这一步还没有返回,表示CAS操作更改TIDYING失败了,可能是其他线程操作成功了,因此继续下一次循环。如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出。这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成。
/**
* 以下两种情况,线程池状态将尝试转换为TERMINATED
* 1 SHUTDOWN状态的线程,当任务队列为空并且线程池工作线程数workerCount为0时
* 2 STOP状态的线程池,线程池中工作线程数workerCount为0时
* <p>
* 该方法不是私有的,因为子类ScheduledThreadPoolExecutor也会调用该方法
*/
final void tryTerminate() {
/*开启一个死循环,尝试终止线程池*/
for (; ; ) {
//获取此时最新的ctl值c
int c = ctl.get();
/*
* 如果处于RUNNING状态,不能终止线程池
* 或者 如果运行状态值大于等于TIDYING,不需要该请求去终止线程池
* 或者 如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池
*
* 以上三个条件满足一种,即可立即返回
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
/*
* 到这里,表示:
* 1 线程池位于STOP状态
* 2 或者线程池位于SHUTDOWN状态,且workQueue任务队列为空
*
* 上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断
*
* 如果线程数不为0,那么不能终止线程池
* 这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断
* 当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断
* 执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们中断
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
/*
* 那么调用interruptIdleWorkers(ONLY_ONE)尝试中断最多一个线程
*
* tryTerminate在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用
* 这几个方法被调用时,一般都是需要改变线程池状态、移除workers集合的线程、移除任务队列中的任务的情况
* 这里是补偿式的尝试中断此时又处于空闲状态的线程,用来方便的传播SHUTDOWN、STOP信号,因为被中断的线程会在processWorkerExit方法中继续调用该方法
* 防止当前所有线程都在等待任务,中断任何任意一个的线程就还可以确保自SHUTDOWN状态开始以来新到达的Worker最终也会退出。
* 为了保证所有线程中断,仅需要中断一个空闲的线程即可,后续中断状态会进行传播式的调用
*/
interruptIdleWorkers(ONLY_ONE);
//返回
return;
}
final ReentrantLock mainLock = this.mainLock;
//到这里表示工作线程数为0,那么表示可以更改线程池状态,进一步停止线程池。
//首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程
mainLock.lock();
try {
//尝试将ctl的值CAS的从c更改为TIDYING状态的值
//即删除全部wc工作线程计数部分的值,转换为TERMINATED状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
//如果成功
try {
//执行terminated()钩子方法,该方法是空实现,可以由子类重写
terminated();
} finally {
//无论terminated是否抛出异常,最终,都将ctl的值CAS的从TIDYING更改为TERMINATED状态的值
//即转换为TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
//唤醒因为调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭
termination.signalAll();
}
//返回
return;
}
} finally {
//解锁
mainLock.unlock();
}
// else retry on failed CAS
/*
* 到这一步,表示CAS操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)失败,可能是其他线程操作成功了,因此继续下一次循环
* 如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出
* 这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成
*/
}
}
interruptIdleWorkers中断空闲线程
尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程,以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow()),或者配置被更改的信息(比如setMaximumPoolSize())。
shutdown()方法或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()或者tryTerminate等方法中会调用。
代码很简单,主要是理解onlyOne参数的意思,如果为true,那么最多中断一个Woker;如果为false,那么尝试中断所有空闲线程。代码中循环workers集合,如果当前Worker没有被中断,并且尝试获取Worker锁成功(此前没有获取Worker锁),那么就是中断该线程,随后根据onlyOne参数确定是否跳出集合。
/**
* 尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程
* 以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow())
* 或者配置被更改的信息(比如setMaximumPoolSize())
*
* @param onlyOne 如果为true,那么最多中断一个Woker。
* 在tryTerminate方法中启动了终止状态但是还存在Worker的时候会调用并传递true
* 补偿式的尝试中断此时处于空闲状态的线程,用来方便的传播SHUTDOWN、STOP信号,
* 因为被中断的线程会在processWorkerExit方法中继续调用该方法
* 防止当前所有线程都在等待任务,中断任何任意一个的线程就还可以确保自SHUTDOWN状态开始以来新到达的Worker最终也会退出。
* 为了保证所有线程中断,仅需要中断一个空闲的线程即可,后续中断状态会进行传播式的调用
* <p>
* 如果为false,那么尝试中断所有空闲线程
* 在shutdown()或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()等方法中会调用
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
/*遍历workers集合*/
for (Worker w : workers) {
//获取w的线程t
Thread t = w.thread;
//如果t没有没中断,并且尝试获取Worker锁成功
if (!t.isInterrupted() && w.tryLock()) {
try {
//如果获取锁成功,那么表示此线程就是空闲线程,那么线程t被中断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
//释放Worker锁
w.unlock();
}
}
//如果onlyOne为true,那么就跳出循环了,否则就一直循环遍历所有Worker
if (onlyOne)
break;
}
} finally {
//释放mainLock锁
mainLock.unlock();
}
}
Worker线程包装类
线程池中的一个工作线程就被包装为一个Worker的对象,实现了Runnable 接口,可以看作线程任务。
Worker还继承了AbstractQueuedSynchronizer(AQS),自己实现了简单的不可重入的独占锁,state=0 表示锁未被获取状态, state=l 表示锁己经被获取的状态。这里的实现的独占“锁”,实际上就是用来控制该线程是否可以被中断的,获取锁的线程可以看作是正在工作中的线程,没有获取到锁的线程可以看作空闲线程。
不可重入是为了保护正在执行任务的线程(已经获取到了锁)对应的的w锁不会被被其他比如setCorePoolSize、setMaximumPoolSize、shutdown等线程池控制方法再次获取并中断。
另外刚创建Worker时state状态则被设置为-l,这是为了避免该新增的Worker的线程在运行runWorker()方法前就被中断(包括shutdown和shutdownNow),即还没处理过任务是不允许被中断的。
它的构造器中会调用newThread方法,该方法需要传递一个线程任务给线程工厂,然后线程工厂将返回一个线程,该线程启动后会执行这个线程任务。可以看到这里传递的就是this,这个this代指该Worker对象本身,因为Worker实现了Runnable,因此实际上返回的thread在start启动之后,会执行对应Worker的run方法。
关于run方法以及与锁相关的方法介绍请看代码注释:
/**
1. 线程池中的一个工作线程就被包装为一个Worker的对象
2. 实现了Runnable 接口,可以看作线程任务。
3. <p>
4. 继承了AbstractQueuedSynchronizer(AQS),自己实现了简单的不可重入的独占锁,state=0 表示锁未被获取状态, state=l 表示锁己经被获取的状态
5. 这里的实现的独占“锁”,实际上就是用来控制该线程是否可以被中断的,获取锁的线程可以看作是正在工作中的线程,没有获取到锁的线程可以看作空闲线程
6. 不可重入是为了保护正在执行任务的线程(已经获取到了锁)对应的的w锁不会被被其他比如setCorePoolSize、setMaximumPoolSize、shutdown等线程池控制方法再次获取并中断
7. 另外刚创建Worker时state状态则被设置为-l,这是为了避免该新增的Worker的线程在运行runWorker()方法前就被中断(包括shutdown和shutdownNow),即还没处理过任务是不允许被中断的
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* 此类永远不会序列化,提供的serialVersionUID是用来来抑制javac警告的。
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* 此Worker对应的工作线程,由线程工厂创建
*/
final Thread thread;
/**
* 要运行的初始任务,可能为null
*/
Runnable firstTask;
/**
* 用来统计该Worker对应的工作线程完成的任务数
*/
volatile long completedTasks;
/**
* 通过给定的第一个任务和线程工厂创建一个Worker
*
* @param firstTask 第一个任务,如果没有就是null
*/
Worker(Runnable firstTask) {
//首先将state的值设置为-1,用于禁止中断,直到runWorker调方法被调用(执行了unlock,将state变成)
setState(-1); // inhibit interrupts until runWorker
//firstTask赋值
this.firstTask = firstTask;
//从线程工厂获取工作线程
//newThread方法需要传递一个线程任务给线程工厂,然后线程工厂将返回一个线程,该线程启动后会执行这个线程任务
//可以看到这里传递的就是this,这个this代指某个Worker对象本身,因为Worker实现了Runnable,
//因此实际上返回的thread在start启动之后,会执行对应Worker的run方法
this.thread = getThreadFactory().newThread(this);
}
/**
* 重写的run方法
*/
public void run() {
//调用runWorker方法,参数传递Worker对象本身
runWorker(this);
}
//与锁相关的方法
/**
* 解锁的方法
* 仅仅是将获取锁的线程变量置为null,随后将state置为0而已,不会进行任何校验
*/
public void unlock() {
//调用了release方法,这里传递的参数1实际上没什么用
release(1);
}
/**
* 重写的tryRelease方法,将会被release方法调用,获取锁
* <p>
* 和一般锁实现不一样,仅仅是将获取锁的线程变量置为null,随后将state置为0而已
*
* @param unused 参数,这里没用到
* @return 永远返回true
*/
protected boolean tryRelease(int unused) {
//将获取锁的线程变量置为null
setExclusiveOwnerThread(null);
//将state置为0
setState(0);
//返回true
return true;
}
/**
* 是否被锁住了,state为-1或者1都会返回true
*
* @return true 是 fale 否
*/
public boolean isLocked() {
return isHeldExclusively();
}
/**
* 是否被锁住了,state为-1或者1都会返回true
*
* @return true 是 fale 否
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 获取锁
*/
public void lock() {
//调用acquire
acquire(1);
}
/**
* 尝试获取锁
*
* @return true 成功 false 失败
*/
public boolean tryLock() {
//调用tryAcquire尝试一次
return tryAcquire(1);
}
/**
* 重写的tryAcquire方法,将会被acquire方法调用,尝试获取锁
* 尝试CAS的将state的值从0改为1,CAS成功表示获取到了锁
*
* @param unused 参数,这里没用到
* @return true 获取成功 false 获取失败
*/
protected boolean tryAcquire(int unused) {
//尝试CAS的将state的值从0改为1,CAS成功表示获取到了锁
if (compareAndSetState(0, 1)) {
//成功之后将当前线程设置为获得锁的线程
setExclusiveOwnerThread(Thread.currentThread());
//返回ture
return true;
}
//返回fasle
return false;
}
/**
* 仅仅被interruptWorkers->shutdownNow方法调用,用于中断线程
* 如果当前Worker的线程t已经执行runWorker方法(即state不为-1),并且还没有没中断,那么中断t
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker执行工作
runWorker方法是Worker的run方法中调用的方法,实际上就是由工作线程执行任务的核心逻辑,到这里,这个方法的执行者将从execute的调用线程变成工作线程。
大概逻辑如下:
工作线程将会从此前的firstTask任务开始执行,如果没有,那么从任务队列通过getTask拉取任务来执行。
如果firstTask和getTask都返回null,那么表示该任务线程将会退出,对应的Worker将被清理。
在执行每一个任务之前,会获取对应的Worker锁,获取了w锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态。此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize、shutdown等其他控制操作获取,即该工作线程此时不能被中断,但是shutdownNow方法除外。
每次都会调用任务前置方法beforeExecute,如果重写了该方法并且抛出了异常,那么任务不会再被执行,并且该线程及其Worker将被清理;每次任务被执行后,都会执行afterExecute后置方法,如果重写了该方法并且抛出了异常,那么该线程及其Worker将被清理。
runWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
获取当前工作线程wt,获取第一个要执行任务w.firstTask赋给的task变量,随后w.firstTask置null,释放引用;
调用unlock“解锁”。这的解锁操作仅仅是将state设置为0,到此该线程将支持被interrupt中断;
completedAbruptly表示线程执行过程中是否发生异常的标志位,初始化为true;
开启一个try代码块:
开启一个循环,下面就是线程不断循环处理任务的逻辑。循环条件是:task不为null,或者调用getTask方法,将的返回值赋给task,此task不为null,那么可以继续循环处理获取的任务;否则退出循环,该线程以及Worker将会被清理:
获取对应Worker锁,就是将state从0变成1,获取了锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态。此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize、shutdown等其他控制操作获取,即该工作线程此时不能被中断,但是shutdownNow方法除外。
进行一个判断的表达式操作。如果线程池状态值大于等于STOP,此时工作线程应该被无条件被中断,表达式结果为true;线程池状态值小于STOP,但是线程被中断了,那么恢复被中断的线程,将中断标志位改为false,表达式结果为false。
如果表达式为true,将当前线程中断,仅仅是设置中断标志位而已。
在一个try块中执行任务。首先执行前置方法beforeExecute,默认空实现,自定义的子类可以实现自己的逻辑。如果抛出了异常,那么该任务将不会被执行,,该线程以及Worker将会被清理。
使用thrown变量记录任务执行过程中抛出的异常。
在一个try块中, 调用task.run,到这里才是真正的执行任务。途中捕获各种异常赋给thrown。
无论task.run有没有抛出异常,都会在finally中执行后置方法afterExecute,默认空实现,自定义的子类可以实现自己的逻辑。如果抛出了异常,那么该任务将不会被执行,,该线程以及Worker将会被清理。
无论上面的代码有没有抛出异常,都会执行finally语句块:
将task变量置null,表示该任务执行完毕,无论是真的成功了还是失败了。
此Worker记录的完成任务数量completedTasks自增1
如果上面没有抛出异常,那么到此一个任务执行完毕,继续下一次循环,从任务队列中拉取任务执行!
到这一步跳出了循环,表示task为null,并且通过getTask重任务队列获取的任务也为null,即没有任务可执行了,循环结束,这是正常行为,将completedAbruptly置为false。
执行finally语句块的情况:可能是执行过程中抛出了异常,或者getTask返回null,而getTask返回null则可能是各种各样的情况,比如超时、线程池被关闭等。此时该线程和Worker将被清理!
调用processWorkerExit方法,传递Worker对象以及是否发生异常的标志位。processWorkerExit方法会将该Worker移除workers集合,并且根据completedAbruptly决定是否新建Worker添加到workers集合。
/**
* 工作线程执行任务的核心逻辑,大概就是循环进行任务处理,以及Worker销毁的操作
*
* @param w 工作线程对应的包装类
*/
final void runWorker(Worker w) {
//获取当前工作线程wt
Thread wt = Thread.currentThread();
//获取第一个要执行的task记录下来
Runnable task = w.firstTask;
//w.firstTask置空,释放引用
w.firstTask = null;
//解锁,这的解锁操作仅仅是将state设置为0,到此该线程将支持被interrupt中断
w.unlock(); // allow interrupts
//线程执行过程中是否发生异常的标志位,初始化为true
boolean completedAbruptly = true;
try {
/*
* 开启一个循环,这就是线程处理任务的逻辑
* 如果task不为null
* 或者task为null,通过getTask从任务队列获取的任务不为null
* 以上条件满足一个,即可继续循环,否则结束循环
* */
while (task != null || (task = getTask()) != null) {
//获取对应Worker锁,就是将state从0变成1,此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize等其他控制操作获取
//获取了w锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/*
* 两个表达式:
* 1 (runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
* 如果 此时运行状态值大于等于指定状态值STOP,那么第一个表达式满足,由于短路法不会执行那个第二个表达式
* 或者 调用Thread.interrupted()判断线程确实被中断了,如果确实被中断了则该方法还会清除中断标志位,并且此时运行状态值大于等于指定状态值STOP
* 2 !wt.isInterrupted()
* 当前工作线程没有被中断
*
* 上面两个表达式都满足,表示由于线程池状态值大于等于STOP,此时工作线程应该被无条件被中断,将当前线程中断
* 另外,如果不满足无条件中断的要求(线程池状态值小于STOP),但是线程被中断了,还会恢复被中断的线程,将中断标志位改为false
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//将当前线程中断,仅仅是设置中断标志位而已。
wt.interrupt();
try {
//任务开始执行的前置方法,默认空实现,自定义的子类可以实现自己的逻辑
//如果抛出了异常,那么该任务将不会被执行,该线程以及Worker将会被清理
beforeExecute(wt, task);
//thrown记录任务执行过程中抛出的异常
Throwable thrown = null;
try {
//这里才是真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//任务执行完毕的后置方法,默认空实现,自定义的子类可以实现自己的逻辑
//传递执行的任务以及抛出的异常,这个方法也可能抛出新的异常,这将导致该线程停止
afterExecute(task, thrown);
}
} finally {
//无论上面有没有抛出异常,都会执行finally语句块
//将task变量置空,表示该任务执行完毕,无论是真的成功了还是失败了。
task = null;
//此Worker记录的完成任务数量自增1
w.completedTasks++;
//解锁,将state置为0
w.unlock();
}
}
//到这一步,表示task为null,并且通过getTask重任务队列获取的任务也为null,即没有任务可执行了
//这是正常行为,completedAbruptly置为false
completedAbruptly = false;
} finally {
//无论上面有没有抛出异常,都会执行finally语句块。可能是执行过程中抛出了异常,或者getTask返回null
//getTask返回null,则可能是各种各样的情况,比如超时、比如线程池被关闭等
//调用processWorkerExit方法,传递Worker对象以及是否发生异常的标志位
//processWorkerExit方法会将该Worker移除workers集合,并且根据completedAbruptly决定是否新建Worker添加到workers集合
processWorkerExit(w, completedAbruptly);
}
}
getTask拉取任务
getTask方法的主要作用是从阻塞队列中拉取任务,是由工作线程调用的。将会在一个循环中拉取并返回一个任务,由于是阻塞队列,在没有任务时该方法会可能会超时阻塞或者一直阻塞。以下情况将会直接返回null:
具有超过maximumPoolSize的线程数量,可能在运行时动态的设置了maximumPoolSize,将maximumPoolSize调小了,此时需要丢弃部分工作线程;
线程池处于STOP及其之后的状态,无论还有没有任务,无条件清除全部工作线程;
线程池处于SHUTDOWN状态,且 workQueue 为空,队列中的任务已执行完毕,清除工作线程;
如果线程数大于corePoolSize,则对超过的线程在keepAliveTime超时之后还没获取到任务就会返回null,如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么对全部线程应用超时时间,这里返回null用于清除多余的工作线程,控制线程数量。
getTask的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
timedOut变量代表上一次的poll()操作是否超时的标志,初始化为fale,表示未超时;
开启一个死循环,相当于自旋,从任务队列中拉取任务:
获取此时的ctl值c,获取此时的运行状态rs;
检查线程池状态: 如果线程池状态值大于等于SHUTDOWN,并且(线程池状态值大于等于STOP,或者任务队列为空)。即如果线程池处于STOP及其之后的状态,或者线程池处于SHUTDOWN状态,且 workQueue 为空。那么表示线程池被关闭,且线程不可以继续执行下去。
循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
返回null,getTask方法结束,随后该Worker会被清理。
到这一步,表示线程池状态符合要求,但是状态是可能随时变化的。获取线程数量wc。
timed变量表示对工作线程是否应用超时等待的标志。如果allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),表示所有线程都应用超时等待,或者wc大于核心线程数量,那么可以对超过corePoolSize的线程应用超时等待。以上情况满足一种,timed即为true,否则为false。
校验线程数量以及是否超时: (如果wc大于最大线程数,表示在线程池运行时动态的将maximumPoolSize调小了,或者上一次的poll()操作已经超时,并且可以对工作线程应用超时等待),并且(如果wc大于1,或者任务队列为空)。
这两种情况都满足,表示就可能会返回null。尝试CAS的将ctl的WorkerCount线程数量部分自减1;成功之后返回null,随后该Worker会被清理,getTask方法结束。
CAS失败之后,继续下一次循环,即这种关闭不是需要立即完成的。因为没有使用锁,可能有多个线程同时超时,此时需要控制因为超时返回的线程数量满足要求,比如某一批CAS成功的超时线程返回之后,其他CAS失败的超时线程在下一次循环时就可能因为数量不达标不会应用超时了。这里也是为什么本次的超时操作要等待到下一循环次才可能返回的原因,因为每次循环都会获取最新的wc和timed值。
到这一步,表示线程池的状态、线程数量、以及超时时间满足要求,开始真正的拉取任务。开启一个try代码块:
判断timed是否为true,如果为true,那么需要对线程应用超时等待,调用workQueue的超时poll方法,在超时时间范围内等待获取并移除任务(队头),如果超时时间没有获取道任务,那么返回null;如果为false,那么不需要对线程应用超时等待,调用workQueue的take方法,获取并移除任务(队头),没有获取到任务将会一直等待。返回值使用r变量接收。
到这一步,表示拉取的方法返回了,如果返回值r不为null,表示拉取到了任务,那么返回该任务,getTask方法结束。
到这一步还没有返回,表示没有拉取到任务,属于等待超时,那么timedOut设置为true。继续下一次循环,下一次循环中将可能会返回null,也可能不会。
使用catch捕获try块中抛出的InterruptedException。如果捕获到了,那么timedOut设置为false,此时不算拉取超时,继续下一次循环。在shutdown或者其他调整核心参数的方法关闭空闲线程的时候,就是设置中断标志,会将等待的线程唤醒,下一次循环中该空闲线程可能会由于线程状态的原因而返回。如果发生了其他异常,那么由于不能捕获异常而,直接退出该方法,getTask方法结束,随后该Worker会被清理。
/**
* 尝试循环从任务阻塞队列拉取新任务的方法,以下情况将会直接返回null:
* 1 具有超过maximumPoolSize的线程数量,可能在运行时动态的设置了maximumPoolSize,将maximumPoolSize调小了,此时需要丢弃部分工作线程
* 2 线程池处于STOP及其之后的状态,无论还有没有任务,无条件清除全部工作线程
* 3 线程池处于SHUTDOWN状态,且 workQueue 为空,队列中的任务已执行完毕,清除工作线程
* 4 如果线程数大于corePoolSize,则对超过的线程在keepAliveTime超时之后还没获取到任务就会返回null,如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么对全部线程应用超时时间
* 用于清除多余的工作线程,控制线程数量。
*
* @return 任务,或者null,表示该工作线程以及Worker都需要被清理,并且workerCount线程计数会自减1
*/
private Runnable getTask() {
//上一次的poll()操作是否超时的标志,初始化为fale,表示未超时
boolean timedOut = false; // Did the last poll() time out?
/*开启一个死循环,相当于自旋,从任务队列中拉取任务*/
for (; ; ) {
//获取此时的ctl值c
int c = ctl.get();
//获取此时的运行状态rs
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程池状态值大于等于SHUTDOWN,并且(线程池状态值大于等于STOP,或者任务队列为空)
* 即如果线程池处于STOP及其之后的状态,或者线程池处于SHUTDOWN状态,且 workQueue 为空
* 那么表示线程池被关闭,且线程不可以继续执行下去
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
decrementWorkerCount();
//返回null,随后该Worker会被清理
return null;
}
//到这一步,表示线程池状态符合要求,但是状态是可能随时变化的
//获取线程数量wc
int wc = workerCountOf(c);
// Are workers subject to culling?
/*
* timed表示对工作线程是否应用超时等待的标志
* 如果allowCoreThreadTimeOut为true,表示所有线程都应用超时等待
* 或者 wc大于核心线程数量,那么可以对超过corePoolSize的线程应用超时等待
* 以上情况满足一种,timed即为true,否则为false
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* 如果wc大于最大线程数,表示在线程池运行时动态的将maximumPoolSize调小了,或者上一次的poll()操作已经超时,并且可以对工作线程应用超时等待
* 并且 wc大于1,或者任务队列为空
*
* 这两种情况都满足,表示就可能会返回null
*
* 这里也能看出来,如果此时wc为1,即当前线程是唯一的工作线程,并且此时任务队列还不为null
* 那么当前线程不会因为超时而返回,因为还需要至少一条线程去执行任务队列中的任务
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//尝试CAS的将ctl的WorkerCount线程数量部分自减1;
if (compareAndDecrementWorkerCount(c))
//成功之后返回null,随后该Worker会被清理
return null;
/*
* CAS失败之后,继续下一次循环,即这种关闭不是需要立即完成的。
* 因为没有使用锁,可能有多个线程同时超时,此时需要控制因为超时返回的线程数量满足要求.
* 比如某一批CAS成功的超时线程返回之后,其他CAS失败的超时线程在下一次循环时就可能因为数量不达标不会应用超时了。
* 这里也是为什么本次的超时操作要等待到下一循环次才可能返回的原因,因为每次循环都会获取最新的wc和timed值.
*/
continue;
}
//到这一步,表示线程池的状态、线程数量、以及超时时间满足要求,开始拉取任务
try {
/*
* 判断timed是否为true
* 如果为true,那么需要对线程应用超时等待,调用workQueue的超时poll方法,
* 在超时时间范围内等待获取并移除任务(队头),如果超时时间没有获取道任务,那么返回null
* 如果为false,那么不需要对线程应用超时等待,调用workQueue的take方法,
* 获取并移除任务(队头),没有获取到任务将会一直等待
*
* 返回值使用r变量接收
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//到这一步,表示拉取的方法返回了,如果返回值r不为null,表示拉取到了任务,那么返回任务
if (r != null)
return r;
//到这一步还没有返回,表示没有拉取到任务,属于等待超时,那么timedOut设置为true
//继续下一次循环,下一次循环中将可能会返回null,也可能不会。
timedOut = true;
} catch (InterruptedException retry) {
//如果在拉取过程中等待时被中断,那么会抛出InterruptedException异常,这里将捕获这个异常
//此时不算拉取超时,timedOut设置为false,继续下一次循环。在shutdown方法关闭空闲线程的时候,就是设置中断标志,会将等待的线程唤醒,下一次循环中该空闲线程可能会由于线程状态的原因而返回
timedOut = false;
//发生的其他异常,则直接退出该方法
}
}
}
processWorkerExit清理/补充Worker
当在runWorker的任务执行过程中抛出异常,或者getTask返回null,getTask返回null,则可能是各种各样的情况,比如超时、比如线程池被关闭等,那么这个Worker被丢弃,此时需要调用processWorkerExit方法对于无用的Worker进行清理,该方法仅仅会被Worker对象的线程调用,即也是由工作线程调用的。
processWorkerExit方法需要传递Worker对象以及是否发生异常的标志位completedAbruptly。该方法会将参数Worker移除workers集合,并且根据线程池状态以及completedAbruptly决定是否补充新Worker。
在移除Worker之后,如果此时线程池状态为RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止,或者不是应为抛出异常而被停止,但此时的线程数量小于最小线程数min。这两情况都会补充一个新Worker。 这就是我们常说的:线程池中的线程抛出异常之后,会自动补充开启一个线程的原理,现在我们也能明白不是无条件开启的。
processWorkerExit的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
如果completedAbruptly为true,表示因为异常才调用该方法,此时workerCount还没有来得及自减1,这里需要补上:
循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
获取mainLock锁;
更新线程池已完成的任务数量,当前的值加上终止线程记录的任务执行数量,只有在某个Worker工作线程终止时才会更新。
从workers集合中移除该Worker。
最终解锁。
此时线程池可能不是处于RUNNING状态,调用tryTerminate尝试彻底终止线程池。
获取此时的ctl值c。如果线程池状态值小于等于STOP,即处于RUNNING或者SHUTDOWN状态,那么可能需要补充线程。
继续判断如果completedAbruptly为false,表示不是因为抛出异常被停止。
使用min变量表示线程池需要保持的最小线程数。如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么表示对所有线程应用超时,那么min=0;如果allowCoreThreadTimeOut为fasle,那么表示对超过核心线程数量的线程线程应用超时,那么min=corePoolSize。
如果min为0,并且任务队列不等于空,那么最小线程数应该为1,因为此时需要至少一个线程去执行任务队列中的任务。min设置为1。
如果线程数量大于等于最小线程数min,那么不需要补充线程,直接return结束方法;否则表示不需要补充线程,进入下一步。
到这一步,表示:处于RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止,或者处于RUNNING或者SHUTDOWN状态,并且此时的线程数量小于最小线程数min。这两种情况满足一种,即需要补充一个Worker,那么调用addWorker方法尝试新增一个Worker。
/**
* 对于无用的Worker进行清理,该方法仅仅会被Worker对象的线程调用
* 将参数Worker移除workers集合,并且根据线程池状态以及completedAbruptly决定是否补充新Worker。
*
* @param w Worker
* @param completedAbruptly 如果此Worker因抛出异常而停止,则为true,否则为false
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果completedAbruptly为true,表示因为异常才调用该方法
// 此时workerCount还没有来得及自减1,这里需要补上
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//更新线程池已完成的任务数量,当前的值加上终止线程记录的任务执行数量,只有在某个Worker工作线程终止时才会更新
completedTaskCount += w.completedTasks;
//从workers中移除该Worker
workers.remove(w);
} finally {
//最终解锁
mainLock.unlock();
}
//此时线程池可能不是处于RUNNING状态,调用tryTerminate尝试彻底终止线程池
tryTerminate();
//获取此时的ctl值c
int c = ctl.get();
/*
* 如果c的状态值小于等于STOP,即处于RUNNING或者SHUTDOWN状态,那么可能需要补充线程。
*/
if (runStateLessThan(c, STOP)) {
//如果completedAbruptly为false,表示不是因为抛出异常被停止
if (!completedAbruptly) {
//min表示线程池需要保持的最小线程数
// 如果allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么表示对所有线程应用超时,那么min=0
// 如果allowCoreThreadTimeOut为fasle,那么表示对超过核心线程数量的线程线程应用超时,那么min=corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果min为0,并且任务队列不等于空,那么最小线程数应该为1,因为此时需要之手啊一个线程去执行任务队列中的任务
if (min == 0 && !workQueue.isEmpty())
//min设置为1
min = 1;
//如果此时的线程数量大于等于最小线程数min,那么不需要补充线程,直接return结束方法
//否则表示不需要补充线程,进入下一步
if (workerCountOf(c) >= min)
return; // replacement not needed
}
/*
* 到这一步,表示:
* 1 处于RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止
* 2 或者 处于RUNNING或者SHUTDOWN状态,并且此时的线程数量小于最小线程数min
*
* 这两种情况满足一种,即需要补充一个Worker,那么调用addWorker尝试新增一个Worker
*/
addWorker(null, false);
}
}
submit提交方法
除了execute方法之外,ThreadPoolExecutor 还有其他三个单提交的方法,它们的内部都是调用的execute方法。
Future submit(Runnable)
public Future< ? > submit(Runnable task)
task要求一个 Runnable 实现类,返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕。该 Future 的 get 方法在任务成功完成时将会返回 null。
如果任务无法安排执行,那么可能抛出RejectedExecutionException。如果该任务为 null,那么抛出NullPointerException。
/**
* 位于父类AbstractExecutorService中的实现
*
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException 如果该任务为 null
*/
public Future<?> submit(Runnable task) {
//首先就是task任务的null校验,如果为null直接抛出NullPointerException
if (task == null) throw new NullPointerException();
//将Runnable类型的task任务包装为RunnableFuture类型的ftask任务,返回null
RunnableFuture<Void> ftask = newTaskFor(task, null);
//调用execute方法传递ftask,RunnableFuture是Runnable的子接口,因此可以传递
execute(ftask);
return ftask;
}
/**
* 位于父类AbstractExecutorService中的实现
* 为给定可运行任务返回一个 RunnableFuture。
* 该RunnableFuture具有默认的返回值,并为底层任务提供取消操作。
*
* @param runnable 被包装的任务
* @param value 将要返回的默认值
* @param <T> 返回值类型
* @return 为给定可运行任务返回一个 RunnableFuture。
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
//调用FutureTask的构造器
return new FutureTask<T>(runnable, value);
}
后面我们再介绍FutureTask是如何对Runnable以及Callable任务进行包装的,以及FutureTask相关方法的实现。
Future submit(Runnable, T)
public < T > Future< T > submit(Runnable task,T result)
task要求一个 Runnable 实现类,并且需要一个给定的返回值result,将返回一个 Future 对象,这个 Future 对象除了可以用来检查 Runnable 是否已经执行完毕,还可以调用get()方法获取执行结果,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
如果任务无法安排执行,那么可能抛出RejectedExecutionException。如果该任务为 null,那么抛出NullPointerException。
/**
* 位于父类AbstractExecutorService中的实现
* 为给定可运行任务返回一个 RunnableFuture。该RunnableFuture具有指定默认的返回值result,并为底层任务提供取消等操作。
* 同时调用execute执行这个RunnableFuture实例。
*
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException 如果该任务为 null
*/
public <T> Future<T> submit(Runnable task, T result) {
//首先就是task任务的null校验,如果为null直接抛出NullPointerException
if (task == null) throw new NullPointerException();
//将Runnable类型的task任务包装为RunnableFuture类型的ftask任务,get返回指定值
RunnableFuture<T> ftask = newTaskFor(task, result);
//调用execute方法传递ftask,RunnableFuture是Runnable的子接口,因此可以传递
execute(ftask);
return ftask;
}
Future submit(Callable< T >)
public < T > Future< T > submit(Callable< T > task)
task要求一个 Callable实现类。返回一个 Future 对象,这个 Future 对象除了可以用来检查 Runnable 是否已经执行完毕,还可以调用get()方法获取Callable中call方法的执行结果,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
如果任务无法安排执行,那么可能抛出RejectedExecutionException。如果该任务为 null,那么抛出NullPointerException。
/**
* 位于父类AbstractExecutorService中的实现
* 为给定可运行任务返回一个 RunnableFuture。该RunnableFuture的返回值result就是传递的Callable的call方法的返回值,并为底层任务提供取消等操作。
* 同时调用execute执行这个RunnableFuture实例。
*
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException 如果该任务为 null
*/
public <T> Future<T> submit(Callable<T> task) {
//首先就是task任务的null校验,如果为null直接抛出NullPointerException
if (task == null) throw new NullPointerException();
//将Runnable类型的task任务包装为RunnableFuture类型的ftask任务,get返回指定值
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**
* 为给定可运行任务返回一个 RunnableFuture。该RunnableFuture的返回值result就是传递的Callable的call方法的返回值,并为底层任务提供取消等操作。
*
* @param callable 被包装的任务
* @param <T> Callable返回值类型
* @return 为给定可运行任务返回一个 RunnableFuture。
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
//调用FutureTask的构造器
return new FutureTask<T>(callable);
}
FutureTask的原理
FutureTask的概述
public class FutureTask< V >
extends Object
implements RunnableFuture< V >
在最开始我们就简单介绍过,FutureTask作为可取消的异步计算。类提供了对 Future 的基本实现,具有开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法等方法。重要的get()方法仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
FutureTask可以与线程池连用,也可以单独与Thread使用,因此完全可以与线程池独立开来讲解。
上面的submit方法内部调用了execute方法,传递的参数是一个FutureTask实例,类型为RunnableFuture,RunnableFuture是Runnable的子接口,因此可以传递。
工作线程在runWorker方法中调用task.run()方法执行任务。此时我们可以知道,对于submit系列方法,并不是直接运行传递的task的run或者call方法,而是运行该task的包装对象FutureTask的run方法。 可以猜测到FutureTask的run方法对于传递进来的原始的Runnable或者Callable任务的run或者call方法做了包装。现在我们来看看FutureTask到底是如何对Runnable或者Callable任务进行包装的。
先看一个简单的使用案例:
FutureTask objectFutureTask = new FutureTask(() -> {
Thread.sleep(3000);
return 1;
});
//与线程连用
Thread thread = new Thread(objectFutureTask);
thread.start();
//约3秒钟之后会获取返回值
System.out.println(objectFutureTask.get());
FutureTask的重要属性
FutureTask作为一个任务,具有7种不同的运行状态,每种运行状态使用单独的int类型表示,并且它们有大小关系,这类似于ThreadPoolExecutor的状态的作用,可以通过比较大小来判断任务执行情况:NEW< COMPLETING< NORMAL< EXCEPTIONAL< CANCELLED< INTERRUPTING< INTERRUPTED。比如get方法就用到了大小关系。
内部保存的任务类型只有一个Callable类型,也就是说,实际上构造器传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务。
具有一个Object类型的outcome保存要从get方法返回的数据或者要抛出的异常。
还有一个WaitNode类型的单链表,用于保存等待该FutureTask的返回值的线程。
/**
* 此任务的运行状态
* 运行状态仅在set、setException、cancel方法中转换
* 可能的状态转换如下:
* NEW -> COMPLETING -> NORMAL 表示正常执行完毕
* NEW -> COMPLETING -> EXCEPTIONAL 表示执行任务时发生异常
* NEW -> CANCELLED 表示任务被取消,cancel(false)。如果此任务尚未启动,则此任务将永不运行;如果任务已经启动,则不影响运行
* NEW -> INTERRUPTING -> INTERRUPTED 表示任务被取消,cancel(false)。如果此任务尚未启动,则此任务将永不运行;如果任务已经启动,则停止执行(中断任务线程)
*/
private volatile int state;
/**
* new新建任务,或者正在执行任务call()方法的状态
*/
private static final int NEW = 0;
/**
* 任务call()方法被调用完毕之后(无论成功还是抛出异常)的状态,
* 或者调用set、setException方法CAS成功之后的状态
* 此时outcome还没有记录执行结果
* 这是一个中间状态
*/
private static final int COMPLETING = 1;
/**
* 任务call()方法被调用成功之后的最终状态
* 或者调用set方法直接设置结果成功之后的最终状态
* 此时outcome已经记录执行结果
* 该状态不需要立即感知
*/
private static final int NORMAL = 2;
/**
* 任务call()方法被调用抛出异常之后的最终状态
* 或者调用setException方法直接设置异常信息成功之后的最终状态
* 此时outcome已经记录执行结果
* 该状态不需要立即感知
*/
private static final int EXCEPTIONAL = 3;
/**
* 任务还未执行或者正在执行(NEW状态)时,调用cancel(false)尝试取消任务操作成功的最终状态
*/
private static final int CANCELLED = 4;
/**
* 任务还未执行或者正在执行(NEW状态)时,调用cancel(true)尝试取消任务操作成功的状态
* 这是一个中间状态
*/
private static final int INTERRUPTING = 5;
/**
* 调用cancel(true)设置INTERRUPTING状态成功之后会继续设置的最终状态
* 如果此时任务正在执行,那么中断该任务运行线程之后设置的状态
* 该状态不需要立即感知
*/
private static final int INTERRUPTED = 6;
/**
* 保存的构造器传递进来的任务,可以看到就是Callable类型
* 实际上构造器传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务
* 在运行完毕之后(无论是成功还是异常)将会置为null
*/
private Callable<V> callable;
/**
* 要返回的结果,或者要从get()方法中抛出的异常
* 默认为null,set或者setException方法调用成功之后被设置值
*/
private Object outcome;
/**
* 运行Callable任务的线程
* 默认为null,在run方法被成功调用之后会被赋值初始化
*/
private volatile Thread runner;
/**
* 等待该FutureTask的返回值的线程,是一个单链表队列,waiters存储头部元素
*/
private volatile WaitNode waiters;
/**
* 简单的单链表实现内部类,用于存放在get方法上等待的线程
*/
static final class WaitNode {
/**
* 等待的线程
*/
volatile Thread thread;
/**
* 后继
*/
volatile WaitNode next;
/**
* 构造器
*/
WaitNode() {
thread = Thread.currentThread();
}
}
FutureTask的构造器
public FutureTask(Callable< V > callable)
创建一个 FutureTask,一旦运行就执行给定的Callable任务,并且任务完成时可通过get方法返回call()的结果。
public FutureTask(Runnable runnable, V result)
创建一个 FutureTask,一旦运行就执行给定的Runnable任务,并且任务完成时可通过get方法返回指定的结果。
sumbit方法就是通过调用FutureTask的构造器返回FutureTask实例的。可以看到传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务,这是一种设计模式—适配器模式。而FutureTask对callable任务的包装也是一种适配器模式——转换为Runnable类型。
构造器仅仅是初始化callable属性,以及FutureTask状态为NEW。
/**
* 创建一个 FutureTask,一旦运行就执行给定的Callable任务,并且任务完成时可通过get方法返回call()的结果。
*
* @param callable callable任务
* @throws NullPointerException 如果 callable 为 null。
*/
public FutureTask(Callable<V> callable) {
//callable的null校验
if (callable == null)
throw new NullPointerException();
//初始化callable属性
this.callable = callable;
//初始化FutureTask状态为NEW
this.state = NEW; // ensure visibility of callable
}
/**
* 创建一个 FutureTask,一旦运行就执行给定的Runnable任务,并且任务完成时可通过get方法返回指定的结果。
*
* @param runnable runnable 任务
* @param result 任务完成时指定返回的结果
* @throws NullPointerException 如果 runnable 为 null。
*/
public FutureTask(Runnable runnable, V result) {
//调用Executors.callable将runnable和指定返回值包装并返回一个callable
//这里的源码我们在 Executors章节部分讲解
this.callable = Executors.callable(runnable, result);
//初始化FutureTask状态为NEW
this.state = NEW; // ensure visibility of callable
}
run核心方法
无论是单独和一个Thread线程使用还是和线程池使用,内部的线程运行的不再是被包装的任务的run或者call方法,而是FutureTask的run。这个方法是FutureTask的核心方法,只能被执行任务的线程调用,其他线程是无法调用的。
run的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
校验任务状态state: 如果如果state不是NEW状态,或者state是NEW状态,但是CAS的将runner从null设置为当前调用线程失败。以上两个条件满足一个就立即返回,任务不会被执行,即要求任务在此前没有被执行过才能开始执行。
到这一步,说明任务没有被执行过,此时执行该任务的线程被记录在runner中。在cancel(true)方法中,如果runner不为null,那么任务就被算作正在执行过程中了,开启一个try块,执行任务:
保存要执行的任务c,校验任务: 如果c不为null,并且state还是为NEW,那么可以执行真正的任务。这里还需要校验一下state,因为在上次校验到此之间可能存在其他线程取消了任务,如果任务在真正开始前就被取消了,那么直接就不执行真正的任务。
result变量用来保存执行结果,ran变量用来保存任务是否执行成功。
在一个try块中,调用c.call()方法,这里才是执行真正的任务,这个call方法中的代码就是我们自己编写的代码。如果call执行成功,那么result接收其返回值,ran置为true,表示执行成功。
使用catch捕获执行过程中的任何异常,如果call方法抛出异常,那么result置为null,ran置为false,表示执行失败。随后调用setException方法,设置异常的返回值,并唤醒因为get阻塞的线程。
如果ran为true,即任务执行成功。调用set方法,用于设置正常的返回值,并唤醒因为get阻塞的线程。
任务执行完毕或者没有执行,都会进入finally代码块中:
将runner设置为null,到此表示任务执行完毕。在任务的校验以及执行过程中runner必须一直非null,用来保证run()方法不会被并发的调用。
重新获取此时的state状态s,因为在上面的任务的校验以及执行过程中可能被cancel方法取消,state会被改变。
如果状态s大于等于INTERRUPTING,表示任务执行过程中其他线程执行了cancel(true)成功,那么本次任务执行的setException以及set方法都执行失败,但是此执行线程不一定被中断了,任务也不一定被取消了。可能执行线程在执行任务call()方法完毕时,cancel的线程更改了state值为INTERRUPTING,这将导致执行线程的setException或者set失败,随后执行线程将runner置空,那么cancel线程在后续代码中由于runner为null将不会中断执行线程;但是也有可能cancel线程先获取到了runner,此时runner还不为null,但是执行线程实际上将任务执行完了,此时还是会中断线程。
此时调用handlePossibleCancellationInterrupt处理可能的中断,等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断。
/**
* FutureTask的核心方法,被执行线程调用,用来执行任务
*/
public void run() {
/*
* 如果state不是NEW状态
* 或者 state是NEW状态,但是CAS的将runner从null设置为当前调用线程失败
*
* 以上两个条件满足一个就立即返回,任务不会被执行,即要求任务在此前没有被执行过才能开始执行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
/*
* 到这一步,说明任务没有被执行过,此时执行该任务的线程被记录在runner中
* 在cancel(true)方法中,如果runner不为null,那么任务就被算作正在执行过程中了
* 开启一个try块,执行任务
*/
try {
//保存要执行的任务c
Callable<V> c = callable;
//如果c不为null,并且state还是为NEW,那么可以执行真正的任务
//这里还需要校验一下,因为在上次校验到此之间可能存在其他线程取消了任务
//如果被取消了,那么不会执行底层真正的任务
if (c != null && state == NEW) {
//result用来保存执行结果
V result;
//ran用来保存任务是否执行成功
boolean ran;
try {
//调用c.call()方法,这里才是执行真正的任务,这个call方法中的代码就是我们自己编写的代码
//如果call执行成功,那么result接收其返回值
result = c.call();
//ran置为true,表示执行成功
ran = true;
} catch (Throwable ex) {
//如果call方法抛出异常,那么result置为null
result = null;
//ran置为false,表示执行失败
ran = false;
//调用setException方法,用于设置异常的返回值,并唤醒因为get阻塞的线程
setException(ex);
}
//如果ran为true,即任务执行成功
if (ran)
//调用set方法,用于设置正常的返回值,并唤醒因为get阻塞的线程
set(result);
}
} finally {
//执行完毕后在finally中,将runner设置为null,到此表示任务执行完毕
//在任务执行过程中runner必须一直非null,用来保证run()方法不会被并发的调用
runner = null;
//重新获取此时的state,因为在任务执行过程中可能被中断,state会被改变
int s = state;
/*
* 如果状态大于等于INTERRUPTING,表示任务执行过程中其他线程执行了cancel(true)成功
* 那么本次任务执行的setException以及set方法都执行失败,但是此执行线程不一定被中断了
* 可能执行线程在执行任务call()方法完毕时,cancel的线程更改了state值为INTERRUPTING,这将导致执行线程的setException或者set失败
*
* 随后执行线程将runner置空,那么cancel线程在后续代码中由于runner为null将不会中断执行线程
* 但是也有可能cancel线程先获取到了runner,此时runner还不为null,但是执行线程实际上将任务执行完了,此时还是会中断线程
*/
if (s >= INTERRUPTING)
//处理中断,等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断
handlePossibleCancellationInterrupt(s);
}
}
/**
* 处理由于调用cancel(true)方法而在任务执行过程中可能的中断
* 等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断
*
* @param s 状态
*/
private void handlePossibleCancellationInterrupt(int s) {
//如果状态为INTERRUPTING
if (s == INTERRUPTING)
//那么循环或去哦最新的状态,如果等于INTERRUPTING,那么让出cpu执行权
//让cancel(true)的线程尽快将状态设置为INTERRUPTED成功
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
set、setException设置结果
run方法中的c.call()调用成功返回或者抛出异常的时候,将分别调用set和setException方法用于尝试CAS的设置state状态为COMPLETING,设置成功之后会继续设置get方法的返回结果outcome,接着lazyset的设置最终状态NORMAL或者EXCEPTIONAL,最后会调用finishCompletion方法唤醒因为调用get方法而阻塞在内部等待队列中的外部线程,并且最后清理callable任务引用。
/**
* 除非已经设置了此 Future已不是NEW状态,否则将其get方法的结果设置为给定的值。
* 在计算成功完成时通过 run 方法内部调用此方法。
*
* @param v c.call方法的返回值
*/
protected void set(V v) {
//尝试CAS的将state的值从NEW设置为COMPLETING,失败则退出方法
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//CAS成功之后,就是所谓的COMPLETING状态
//随后将v的值赋给outcome,作为get方法的返回值
outcome = v;
//设置当前任务的状态为NORMAL ,也就是任务正常结束
/*
* 最终会设置成NORMAL状态,但是可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
* 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,
* 文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
* AtomicLong.lazySet方法的内部就是调用的putOrdered系列方法
*
* 这里没有使用CAS 是因为对于同一个任务只可能有一个线程运行到这里。
* 在这里使用putOrderedlnt,比使用CAS或者putLongvolatile的效率要高,
* 并且这里的场景不要求其他线程马上对设置的NORMAL状态值可见。
*/
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程
//并且最后清理callable任务引用。
finishCompletion();
}
}
/**
* 除非已经设置了此 Future已不是NEW状态,否则将其get方法的结果设置为抛出一个ExecutionException,并将给定的 throwable 作为其原因。
* 在计算失败时通过 run 方法内部调用此方法。
*
* @param t c.call方法异常的原因
*/
protected void setException(Throwable t) {
//尝试CAS的将state的值从NEW设置为COMPLETING,失败则退出方法
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//CAS成功之后,就是所谓的COMPLETING状态
//随后将t的值赋给outcome,作为get方法要抛出的异常
outcome = t;
/*
* 最终会设置成NORMAL状态,但是可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
* 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,
* 文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
* AtomicLong.lazySet方法的内部就是调用的putOrdered系列方法
*
* 这里没有使用CAS 是因为对于同一个任务只可能有一个线程运行到这里。
* 在这里使用putOrderedlnt,比使用CAS或者putLongvolatile的效率要高,
* 并且这里的场景不要求其他线程马上对设置的EXCEPTIONAL状态值可见。
*/
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程
//并且最后清理callable任务引用。
finishCompletion();
}
}
cancel取消任务
public boolean cancel(boolean mayInterruptIfRunning)
试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。
当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程,如果为true,表示应该中断执行此任务的线程,如果为false,表示允许正在运行的任务运行完成。
如果无法取消任务,则返回 false,表示它已经正常完成或者已被取消;否则返回 true,返回true也不代表任务方法没有执行完毕,有可能任务执行完了,只是不能获取返回值而已。因为中断线程不代表会中断任务,即使中断了线程,只是设置了中断标志位,任务有可能实际上还是执行完毕了的,只不过无法获得返回值(get抛出异常)。Java线程中断与停止线程详解以及案例演示。
cancel的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
任务状态state校验: 如果state不等于NEW,说明该任务已完成或者取消;或者如果state等于NEW,但是尝试将CAS的将state的值从NEW改为INTERRUPTING(mayInterruptIfRunning为true)或者CANCELLED(mayInterruptIfRunning为false)失败,表示状态改变失败。这两种情况都表示取消任务失败,直接返回false。
到这一步,表示任务状态state已经被改为INTERRUPTING或者CANCELLED,此时如果在run方法中c.call方法运行完毕或者抛出异常,后续的set或者setException方法也不会成功。而如果run方法中的任务还没有开始执行,那么后面也就不会执行了。开启一个try代码块:
如果mayInterruptIfRunning为true,说明需要中断正在执行的线程。开启一个try代码块:
使用t记录此时的runner,注意这里只是记录了瞬时的runner的状态,有可能刚把runner记录下来,runner在run方法中就被置为null了。
如果t不为null,那么将t中断。
在finally代码块中*,会将状态lazySet的设置为INTERRUPTED状态。
在finally代码块中,调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程,并且最后清理callable任务,即callable不能被再次调用。
最后返回true。
可以看到,cancel方法就有两条任务状态转换路线:
cancel(false): NEW——CANCELLED
cancel(true): NEW——INTERRUPTING——INTERRUPTED
/**
1. 尝试取消任务的执行
2. 3. @param mayInterruptIfRunning 如果应该中断正在执行此任务的线程,则为 true;否则允许正在运行的任务运行完成
4. @return 如果无法取消任务,则返回 false,这通常是由于它已经正常完成或者已取消;否则返回 true
5. 返回true也不代表任务方法没有执行完毕,有可能任务执行完了,只是不能获取返回值而已。
*/
public boolean cancel(boolean mayInterruptIfRunning) {
/*
* 1 如果state不等于NEW,说明该任务已完成或者取消
* 2 如果state等于NEW,但是尝试将CAS的将state的值从NEW改为INTERRUPTING(mayInterruptIfRunning为true)
* 或者CANCELLED(mayInterruptIfRunning为false)失败,表示状态改变失败
*
* 这两种情况都表示取消任务失败,直接返回false
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
/*
* 到这一步,表示任务状态state已经被改为INTERRUPTING或者CANCELLED,
* 此时如果在run方法中c.call方法运行完毕或者抛出异常,后续的set或者setException方法也不会成功
* 而如果run方法中的任务还没有开始执行,那么后面也就不会执行了。
*/
try { // in case call to interrupt throws exception
//如果mayInterruptIfRunning为true,说明需要中断正在执行的线程
if (mayInterruptIfRunning) {
try {
//使用t记录此时的runner,注意这里只是记录了瞬时的runner的状态,有可能刚把runner记录下来,runner在run方法中就被置为null了
Thread t = runner;
/*
* 如果t不为null,那么将t中断。可以看到,所谓的尝试停止任务仅仅是尝试中断执行任务的线程
* 如果任务线程在调用 Object 类的 wait()、wait(long) 或 wait(long, int) 方法,或者该类的 join()、join(long)、join(long,int)、
* sleep(long) 或 sleep(long, int) 方法过程中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException,这是才可能中断任务执行的可能
* 而这个interrupt()不会对正常运行的线程有任何影响,仅仅是设置了中断标志位为true,因此不一定会中断任务执行
*/
if (t != null)
t.interrupt();
} finally { // final state
/*
* 最终会设置成INTERRUPTED状态,但是可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
* 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,
* 文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
* AtomicLong.lazySet方法的内部就是调用的putOrdered系列方法
*
* 这里没有使用CAS 是因为对于同一个任务只可能有一个线程运行到这里。
* 在这里使用putOrderedlnt,比使用CAS或者putLongvolatile的效率要高,
* 并且这里的场景不要求其他线程马上对设置的INTERRUPTED状态值可见。
*/
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程
//并且最后清理callable任务引用。
finishCompletion();
}
//返回true
return true;
}
finishCompletion唤醒等待线程
在set、setException、cancel方法的最后都会调用finishCompletion方法,这三个方法啊实际上都表示任务执行完成的状态,即正常执行、异常执行、取消执行,任务完成之后调用的finishCompletion方法就是为了唤醒因为调用get方法而阻塞在内部等待队列中的外部线程,并且最后清理callable任务引用。
大概步骤还是很简单的:
开启一个循环,q=waiter,如果q不为null,说明有等待的线程,那么进入循环:
尝试CAS的将waiter的指向从q变成null,即清除waiter的引用,好让GC回收,CAS成功之后:
再开启一个内层死循环,用于清理q队列:
获取q保存的阻塞的线程t;
如果t不为null,那么将q.thread引用置为null,unpark唤醒阻塞的t线程。
获取q的后继next,如果后继next为null,说明遍历到了队列尾部,那么break跳出内层循环。
到这一步还没有跳出,表示next不为null,那么q的后继设置为null,这样q结点就脱离了队列。
q设置为q的后继next,继续下一次循环,即继续向后推进移除结点唤醒线程;
跳出内层循环之后,继续break跳出外层循环;
CAS失败则继续下一次外层循环;
执行done方法,该方法是一个空实现,留给子类重写;
callable设置为null,释放引用。
/**
* 唤醒因为调用get方法而阻塞在内部等待队列中的外部线程,并且最后清理callable任务,即callable不能被再次调用。
* 在set、setException、cancel方法的最后都会调用
*/
private void finishCompletion() {
// assert state > COMPLETING;
/*
* 开启一个循环
* q=waiter,如果q不为null,说明有等待的线程,那么进入循环
*/
for (WaitNode q; (q = waiters) != null; ) {
//尝试CAS的将waiter的指向从q变成null,即清除这个引用
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
/*CAS成功之后,开启一个死循环清理q队列*/
for (; ; ) {
//获取q保存的阻塞的线程t
Thread t = q.thread;
//如果t不为null
if (t != null) {
//那么将q.thread引用置为null
q.thread = null;
//unpark唤醒阻塞的t线程
LockSupport.unpark(t);
}
//获取q的后继next
WaitNode next = q.next;
//如果后继next为null,说明遍历到了队列尾部,那么break跳出内层循环
if (next == null)
break;
//到这一步表示next不为null,那么q的后继设置为null,这样q结点就脱离了队列
q.next = null; // unlink to help gc
//q设置为q的后继next,继续下一次循环,即继续向后推进移除结点唤醒线程
q = next;
}
//跳出内层循环之后,继续break跳出外层循环
break;
}
}
//执行done方法,该方法是一个空实现,留给子类重写
done();
//callable设置为null,释放引用
callable = null; // to reduce footprint
}
get()等待获取结果
public V get()
获取任务执行的结果。如果任务没有执行完毕,那么一直等待直到等待被中断,或者被唤醒。
如果任务被取消,那么抛出CancellationException;如果当前的线程在等待时被中断,那么抛出InterruptedException;如果任务执行时抛出异常,那么抛出ExecutionException。
大概步骤就是:
获取state状态值s;
如果s的值小于等于COMPLETING,说明任务还没开始执行,或者正在执行,或者执行完毕但是还没有设置好返回值,那么需要等待:
调用awaitDone方法传递false、0,表示无限等待,返回任务的状态s。
调用report方法解析状态值s,并且做出相应的响应。
/**
1. 获取任务执行的结果。如果任务没有执行完毕,那么一直等待直到被中断。
2. 3. @throws CancellationException 如果任务被取消,那么抛出CancellationException
4. @throws InterruptedException 如果当前的线程在等待时被中断
5. @throws ExecutionException 如果任务执行时抛出异常
*/
public V get() throws InterruptedException, ExecutionException {
//获取state状态值s
int s = state;
//如果s的值小于等于COMPLETING,说明任务还没开始执行,或者正在执行,或者执行完毕但是还没有设置好返回值
//那么需要等待
if (s <= COMPLETING)
//调用awaitDone,传递false、0,表示无限等待,返回任务的状态
s = awaitDone(false, 0L);
//调用report方法解析状态值s,并且做出相应的响应
return report(s);
}
awaitDone等待结果
awaitDone用于等待最终结果,直到任务完毕正常返回(没有阻塞),或者阻塞被unpark唤醒,或者阻塞被中断,或者阻塞超时。
最终会返回一个任务状态,如果当前的线程阻塞被中断则直接抛出InterruptedException异常。
awaitDone的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
如果是超时等待,那么计算超时时间点纳秒,否则为0,使用deadline保存结果;
q保存加入到waiters的结点,或者为null表示没有加入,q将会被加入到链表头部;
queued代表q结点是否已加入链表的标志,默认为false,表示未加入;
开启一个死循环等待,直到任务执行完毕,或者超时时间到,或者等待被中断:
如果当前线程被中断了,那么清除中断标志位,随后进入if代码块:
从waiters移除q结点;随后抛出InterruptedException异常。
获取此时的任务状态s。如果s大于COMPLETING,表示任务已完成:
如果q不等于null,那么q的thread置为null,表示不需要在finishCompletion中被唤醒,并且表示q结点是一个无效结点。
返回状态s。这里能够看出来,q结点没有被即时清理。waiters链表中结点的thread如果为null,那么表示这个结点是无效结点,可以被随时清理。因此无效结点将会在removeWaiter或者finishCompletion方法中被一起清理了。
否则,表示s小于等于COMPLETING。如果s等于COMPLETING,那么表示任务已完成,但是返回值或者最终状态还没有写入:
这里就不需要阻塞了,直接尝试yield让出CPU执行权,希望run线程早点执行完毕。
否则,表示s小于COMPLETING,那就是任务还没有执行或者执行过程中,这样就还不知道任务什么时候执行或者执行完,此时可能需要将线程包装为WaitNode结点加入链表等待。如果q为null:
那么新建一个WaitNode,thread属性在构造器中已经计算出来了。注意这里并没有直接加入链表,而是继续下一次循环,期待下一次循环就能返回或者yield也行,尽量避免阻塞的调用。
否则,到这里表示在前面的判断中没有返回或者yield,并且q不为null。如果queued为false,即q结点还没有加入链表:
那么将q的后继设置为此时的waiters结点,随后尝试CAS的将waiters的指向从q的后继指向q。简单的说就是CAS的加入链表成为链表头部元素,并且waiters持有该结点。注意这里加入之后没有直接阻塞,而是继续下一次循环,期待下一次循环就能返回或者yield也行,尽量避免阻塞的调用。
否则,到这里在前面的判断中没有返回或者yield,并且q不为null,并且已经加入了链表,此时可以尝试阻塞了。如果timed为true,即是超时操作:
那么计算剩余超时时间纳秒nanos;
如果nanos小于等于0,表示超时时间到了也没有获取到任务的结果,那么从链表中移除q结点,返回此时的state,不管是什么状态。
如果nanos大于0,那么调用parkNanos方法阻塞当前线程最长nanos纳秒,直到被唤醒,或中断,或阻塞时间到,随后继续下一次循环。注意parkNanos被中断时不会主动抛出InterruptedException异常,因此还需要在下一次循环中手动判断中断标志位。
到这里,表示非超时操作。
调用park方法一直阻塞当前线程,直到被唤醒,或中断,随后继续下一次循环。注意park被中断时不会主动抛出InterruptedException异常,因此还需要在下一次循环中手动判断中断标志位。
到这里,继续下一次死循环。
/**
* awaitDone用于等待最终结果,直到任务完毕正常返回(没有阻塞),或者阻塞被unpark唤醒,或者阻塞被中断,或者阻塞超时。
*
* @param timed 是否是超时等待
* @param nanos 超时等待时间纳秒,仅在timed为true时有效
* @return 返回时任务的状态
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//如果是超时等待,那么计算超时时间点纳秒,否则为0,使用deadline保存结果
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//q保存加入到waiters的结点,或者为null表示没有加入,q将会被加入到链表头部
WaitNode q = null;
//queued代表q结点是否已加入链表的标志,默认为false,表示未加入
boolean queued = false;
/*循环等待,直到任务执行完毕,或者超时时间到,或者等待被中断*/
for (; ; ) {
//如果当前线程被中断了,那么清除中断标志位,随后进入if代码块
if (Thread.interrupted()) {
//从waiters移除q结点
removeWaiter(q);
//抛出InterruptedException异常
throw new InterruptedException();
}
//获取此时的任务状态s
int s = state;
/*如果s大于COMPLETING,表示任务已完成*/
if (s > COMPLETING) {
//如果q不等于null
if (q != null)
//那么q的thread置为null,表示不需要在finishCompletion中被唤醒,并且表示q结点是一个无效结点
q.thread = null;
//返回状态s
return s;
//这里能够看出来,q结点没有被即时清理。waiters链表中结点的thread如果为null,那么表示这个结点是无效结点
//可以被随时清理,因此无效结点将会在removeWaiter或者finishCompletion方法中被一起清理了
}
/*
* 否则,表示s小于等于COMPLETING
* 如果s等于COMPLETING,那么表示任务已完成,但是返回值或者最终状态还没有写入
*/
else if (s == COMPLETING) // cannot time out yet
//这里就不需要阻塞了,直接尝试yield让出CPU执行权,希望run线程早点执行完毕
Thread.yield();
/*
* 否则,表示s小于COMPLETING,那就是任务还没有执行或者执行过程中
* 还不知道任务什么时候执行或者执行完,此时可能需要将线程包装为WaitNode结点加入链表等待
*
* 如果q为null,那么新建一个WaitNode,thread属性在构造器中已经计算出来了
* 注意这里并没有直接加入链表,而是继续下一次循环,期待下一次循环就能返回或者yield也行,尽量避免阻塞的调用
*/
else if (q == null)
q = new WaitNode();
/*
* 否则,到这里在前面的判断中没有返回或者yield,并且q不为null
* 如果queued为false,即q结点还没有加入链表
*
* 那么将q的后继设置为此时的waiters结点,随后尝试CAS的将waiters的指向从q的后继指向q
* 简单的说就是CAS的加入链表成为链表头部元素,并且waiters持有该结点
*
* 注意这里加入之后没有直接阻塞,而是继续下一次循环,期待下一次循环就能返回或者yield也行,尽量避免阻塞的调用
*/
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
/*
* 否则,到这里表示在前面的判断中没有返回或者yield,并且q不为null,并且已经加入了链表
* 此时可以尝试阻塞了
*
* 如果timed为true,即是超时操作
*/
else if (timed) {
//那么计算剩余超时时间纳秒nanos
nanos = deadline - System.nanoTime();
//如果nanos小于等于0,表示超时时间到了也没有获取到任务的结果
if (nanos <= 0L) {
//从链表中移除q结点
removeWaiter(q);
//返回此时的state,不管是什么状态
return state;
}
//如果nanos大于0,那么调用parkNanos方法阻塞当前线程最长nanos纳秒,直到被唤醒,或中断,或阻塞时间到,随后继续下一次循环
//注意parkNanos被中断时不会主动抛出InterruptedException异常,因此还需要在下一次循环中手动判断中断标志位
LockSupport.parkNanos(this, nanos);
}
/*
* 到这里,表示非超时操作
*/
else
//调用park方法一直阻塞当前线程,直到被唤醒,或中断,随后继续下一次循环
//注意park被中断时不会主动抛出InterruptedException异常,因此还需要在下一次循环中手动判断中断标志位
LockSupport.park(this);
//到这里,继续下一次死循环
}
}
removeWaiter清理无效结点
仅在awaitDone方法中被调用,如果线程等待时被中断或者等待超时,那么将调用removeWaiter方法,尽量移除waiters链表中的全部无效的结点。
原理还是很简单的:如果参数node不为null,那么首先将node的thread置为null,标识为无效结点,随后从头到尾遍历整个waiters链表,将途中遇到的thread为null的结点移除链表即可。
/**
1. 仅在awaitDone方法中被调用
2. 如果线程等待时被中断或者等待超时,那么将尽量移除waiters链表中的全部无效的结点
3. 4. @param node 希望被移除的结点
*/
private void removeWaiter(WaitNode node) {
//如果node不为null,那么尝试移除
if (node != null) {
//将node.thread置为null,释放引用,表明node结点是一个无效结点
node.thread = null;
/*开启一个死循环,使用retry标记*/
retry:
for (; ; ) { // restart on removeWaiter race
/*
* 内部再开启一个循环,将无效结点全部清理了
* 初始化条件:pred=null,pred表示q的前驱,q=waiters,q表示当前结点,从链表头开始
* 循环条件:q不为null的时候,继续循环;设置变量s,作为q的后继
* 后置条件:q设置为s,即向后推进
*/
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
//s保存q的后继
s = q.next;
//如果q的thread不为null,表示q是一个有效结点
if (q.thread != null)
//那么pred设置为q,继续下一次循环
pred = q;
/*
* 否则,表示q是一个无效结点,可以清理
* 如果pred前驱不为null
*/
else if (pred != null) {
//pred的后继设置为s,移除这个没用的q结点
pred.next = s;
//如果pred的thread此时也变成null了,那就没办法了
if (pred.thread == null) // check for race
//continue retry,结束内层循环,结束本次外层循环,继续下一次外层循环
//相当于从头开始清理了
continue retry;
}
/*
* 如果pred前驱为null,说明q就是链表头结点
* 那么尝试将waiters指向q的后继s
*/
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
//continue retry,结束内层循环,结束本次外层循环,继续下一次外层循环
//相当于从头(q的后继s)开始清理了,
continue retry;
}
//到这一步,一定是q为null的情况,表示清理到了队列尾部,那么break跳出外层循环,该方法结束
break;
}
}
}
report解析状态
get方法的最后会调用report方法,用来解析获取的任务状态,最终会返回执行结果,或者抛出各种异常。
s = NORMAL:表示任务正常结束,返回执行结果;
s = EXCEPTIONAL:表示任务执行时抛出异常,抛出ExecutionException异常;
s >= CANCELLED:表示任务被取消了(实际上可能已经执行了),抛出CancellationException异常;
/**
* 解析任务状态,最终会返回执行结果,或者抛出各种异常
*
* @param s 任务状态值
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
//获取执行结果x,可能是返回值或者一个异常
Object x = outcome;
//如果s等于NORMAL,即任务正常结束
if (s == NORMAL)
//那么返回执行结果
return (V) x;
//如果s大于等于CANCELLED,那么表示任务被取消了(实际上可能已经执行了)
if (s >= CANCELLED)
//抛出CancellationException异常
throw new CancellationException();
//到这一步一定是EXCEPTIONAL状态,表示任务执行时抛出异常,抛出ExecutionException异常
throw new ExecutionException((Throwable) x);
}
get (timeout, unit)超时等待获取结果
public V get(long timeout, TimeUnit unit)
获取任务执行的结果。如果任务没有执行完毕,那么超时等待直到等待被中断,或者被唤醒,或者等待超时。
如果任务被取消,那么抛出CancellationException;如果当前的线程在等待时被中断,那么抛出InterruptedException;如果任务执行时抛出异常,那么抛出ExecutionException;如果等待超时,那么抛出TimeoutException。
原理和get()都差不多,只不过多了超时状态的处理,即抛出TimeoutException。
/**
* 获取任务执行的结果。如果任务没有执行完毕,那么超时等待直到等待被中断,或者被唤醒,或者等待超时。
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 执行结果
* @throws CancellationException 如果任务被取消,那么抛出CancellationException
* @throws InterruptedException 如果当前的线程在等待时被中断
* @throws ExecutionException 如果任务执行时抛出异常
* @throws TimeoutException 如果等待超时
* @throws NullPointerException 如果时间单位unit为null
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
//如果时间单位为null,那么抛出NullPointerException
if (unit == null)
throw new NullPointerException();
//获取state状态值s
int s = state;
//如果s的值小于等于COMPLETING,说明任务还没开始执行,或者正在执行,或者执行完毕但是还没有设置好返回值
//那么需要等待,调用awaitDone,传递true、unit.toNanos(timeout),表示超时等待,返回任务的状态s
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
//如果返回任务的状态s小于等于COMPLETING,那么表示超时时间到了还没有获取结果,抛出TimeoutException异常
throw new TimeoutException();
//调用report方法解析状态值s,并且做出相应的响应
return report(s);
}
FutureTask与线程池使用的注意
因丢弃任务而阻塞
线程池在使用FutureTask作为任务时(即submit系列方法),如果把拒绝策略设置为DiscardPolicy 或者DiscardOldestPolicy,即丢弃任务,并且在被拒绝的任务的Future对象上调用了get() 方法,那么调用线程将会由于任务永远不会执行而一直被阻塞。案例如下:
/**
* @author lx
*/
public class ThreadPoolFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//自定义线程池,核心线程为0,最大线程为1,使用SynchronousQueue作为任务队列(没有容量),使用DiscardPolicy作为拒绝策略(丢弃任务)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy());
//使用submit系列方法,传递一个Callable,内部会帮我们传递一个FutureTask对象给线程池
//提交一个callable,返回1,获得一个FutureTask对象s1
Future<Object> s1 = threadPoolExecutor.submit(() -> 1);
//提交一个callable,返回2,获得一个FutureTask对象s2
Future<Object> s2 = threadPoolExecutor.submit(() -> 2);
threadPoolExecutor.shutdown();
System.out.println(s1.get());
System.out.println("s1返回成功");
//由于第二个提交的任务被丢弃了,那么导致s2中的任务一直没能执行,而get方法是无限等待的,所以造成了一直阻塞
System.out.println(s2.get());
System.out.println("s2返回成功");
}
}
未能抛出异常
使用submit系列方法时,执行任务过程中遇到的异常将不会直接抛出,而是被封装到了FutureTask中,只有调用get方法时才会抛出异常。因此,如果不需要会的返回值,那么直接使用execute方法提交任务更合适!
/**
* @author lx
*/
public class ThrTask implements Runnable {
private int a, b;
private ThrTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a / b;
System.out.println(re);
}
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
for (int i = 0; i < 5; i++) {
//将不会抛出异常
//threadPoolExecutor.submit(new ThrTask(5, i));
//将会抛出异常
threadPoolExecutor.execute(new ThrTask(5, i));
}
}
}
核心线程预启动
在默认情况下创建的线程池不会有任何线程,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 prestartCoreThread() 和 prestartAllCoreThreads() 方法动态调整。
如果使用非空队列构建池,则可能需要预先启动线程,才能在保证线程池活性。
prestartCoreThread启动一条
public boolean prestartCoreThread()
启动一个核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回 false。
/**
* 启动一个核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回 false。
*
* @return 如果一条线程被启动,那么返回true;否则返回false;
*/
public boolean prestartCoreThread() {
//如果线程数量小于corePoolSize,那么调用addWorker(null, true)启动一条核心线程,启动成功则返回true
//如果线程数量大于等于corePoolSize,或者addWorker启动失败,那么返回false
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
prestartAllCoreThreads启动全部
public int prestartAllCoreThreads()
启动所有核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回,返回通过该方法启动的线程数。
/**
* 启动所有核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回,返回通过该方法启动的线程数。
*
* @return 通过该方法启动的线程数
*/
public int prestartAllCoreThreads() {
//n作为启动的线程计数器
int n = 0;
//循环调用addWorker启动核心线程,如果某一次启动失败那么表示核心线程启动完毕或者线程池被关闭等情况
while (addWorker(null, true))
//成功之后n自增1
++n;
return n;
}
关闭线程池
shutdown温和停止
public void shutdown()
关闭线程池。将线程池状态置为SHUTDOWN,正在执行的任务和队列里等待的任务会执行完,停止接收外部提交的新任务,中断所有空闲线程。通过这个方法不能知道所有任务是否都执行完毕!在线程池对象被GC标记的时候,在finalize方法中也会调用该方法!
shutdown执行之后,由于状态变成了SHURDOWN,那么一切的新任务在addWorker方法中将被拒绝,进而触发拒绝策略的执行。队列中的任务以及正在执行的任务将正常执行,由于没有新任务进来,最终所有任务执行完毕!此时线程池中的每一个非等待的线程都将执行processWorkerExit方法将对应的Worker清除,并且workercount线程计数自减,在processWorkerExit中还会调用tryTerminate(),最终至少有一个线程会判断到SHURDOWN状态并且workercount线程数量为0,随后会将线程池状态改为TIDYING—TERMINATED,最终彻底关闭线程池。
假如tryTerminate()方法中没有interruptIdleWorkers(ONLY_ONE)中断空闲线程的逻辑,如果此时有空闲的线程正在等待任务,并且处于阻塞状态,那么该空闲线程将不被通知而无法执行processWorkerExit方法,此时线程池可能无法正常终止。
/**
* 关闭线程池。将线程池状态置为SHUTDOWN,正在执行的任务和队列里等待的任务会执行完,停止接收外部提交的新任务,中断所有空闲线程。
* 通过这个方法不能知道所有任务是否都执行完毕!
*
* @throws SecurityException 安全管理器不允许中断线程池
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁,即在关闭线程池期间,其他需要锁的操作不被允许
//比如访问线程池信息、新增、移除、中断Worker等操作
mainLock.lock();
try {
//安全管理器检测是否右关闭线程池的权限
checkShutdownAccess();
//线程池状态转换为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲线程
interruptIdleWorkers();
//ThreadPoolExecutor提供空的实现,主要是其子类ScheduledThreadPoolExecutor调用的钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//解锁
mainLock.unlock();
}
//尝试彻底终止线程池:尝试转换为TIDYING以及TERMINATED状态
tryTerminate();
}
advanceRunState切换状态
仅仅被shutdown()和shutdownNow()方法调用,循环的将运行状态转换到指定状态(SHUTDOWN 或者 STOP),除非状态已经大于等于指定状态。
/**
* 仅仅被shutdown()和shutdownNow()方法调用
* 循环的将运行状态转换到指定状态,除非状态已经大于等于指定状态
*
* @param targetState 指定状态,SHUTDOWN 或者 STOP
*/
private void advanceRunState(int targetState) {
/*开启一个循环*/
for (; ; ) {
//获取此时的ctl值c
int c = ctl.get();
//如果运行状态值大于等于指定状态值,那么break退出循环
//或者 尝试CAS的将ctl的运行状态部分的值转换为指定状态(线程数量部分的值不动)成功,那么break退出循环
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
//如果运行状态值小于指定状态值,或者CAS装换状态失败,那么进行下一次循环重试
}
}
interruptIdleWorkers中断所有空闲线程
尝试中断所有空闲线程,内部实际上就是调用的interruptIdleWorkers方法,参数传递的是false,而在前面的tryTerminate方法中是调用interruptIdleWorkers(true),表示尝试中断最多一条空闲线程
/**
* 尝试中断所有空闲线程,内部实际上就是调用的interruptIdleWorkers方法,参数传递的是false
* 而在前面的tryTerminate方法中是调用interruptIdleWorkers(true),表示尝试中断最多一条空闲线程
*/
private void interruptIdleWorkers() {
//这个方法在tryTerminate部分已经讲过了
interruptIdleWorkers(false);
}
shutdownNow立即停止
public List< Runnable > shutdownNow()
关闭线程池。将线程池状态置为STOP,正在执行的任务会尽力尝试终止,队列里的任务会移除,停止接收外部提交的新任务,中断所有线程。通过这个方法也不能知道所有任务是否都执行完毕!返回等待执行的任务的列表。
此实现所谓的终止正在执行的任务策略是通过Thread.interrupt()中断线程,很明显,无法响应中断的任何任务都无法被即时终止。这个原理和FutureTask的cancel(true)取消正在执行的任务原理是一样的,都无法提供任何保证!
/**
* 关闭线程池。将线程池状态置为STOP,正在执行的任务会尽力尝试终止,队列里的任务会移除,停止接收外部提交的新任务,中断所有线程。
* 通过这个方法也不能知道所有任务是否都执行完毕!返回等待执行的任务的列表。
*
* @throws SecurityException 安全管理器不允许中断线程池
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁,即在关闭线程池期间,其他需要锁的操作不被允许
//比如访问线程池信息、新增、移除、中断Worker等操作
mainLock.lock();
try {
//安全管理器检测是否右关闭线程池的权限
checkShutdownAccess();
//线程池状态转换为STOP
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
//移除任务队列中的任务到新集合tasks中
tasks = drainQueue();
} finally {
//解锁
mainLock.unlock();
}
//尝试彻底终止线程池:尝试转换为TIDYING以及TERMINATED状态
tryTerminate();
//返回tasks
return tasks;
}
interruptWorkers中断所有运行线程
中断所有空闲和工作状态的线程,但是对于没有runWorker的线程(state=-1)除外。这里就能看出来初始化Worker时state为-1的用处,要求线程必须运行起来才能中断。
/**
* 中断所有空闲和工作状态的线程,但是对于没有runWorker的线程(state=-1)除外
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers)
//如果线程执行了runWorker(state不等于-1),并且没有被中断,那么中断线程。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* 如果线程执行了runWorker(state不等于-1),并且没有被中断,那么中断线程。
*/
void interruptIfStarted() {
Thread t;
//如果此Worker的state大于等于0,并且thread不为null,并且thread没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
//那么中断thread
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
drainQueue转移全部任务
将任务队列中的任务转移到一个新集合中,仅被shutdownNow()调用。
/**
* 将任务队列中的任务转移到一个新集合中,仅被shutdownNow()调用
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
//新建ArrayList集合taskList,用来存放任务
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
//首先使用BlockingQueue集合本身drainTo方法尝试一次性转移所有任务,
//但是可能由于队列的特性无法转移,比如DelayQueue只能转移超时时间到了的数据
q.drainTo(taskList);
//如果q还有元素
if (!q.isEmpty()) {
//那么遍历q,一个个的转移
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
//返回taskList
return taskList;
}
hook钩子方法
ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法beforeExecute(Thread,Runnable)和afterExecute(Runnable,Throwable);此外,terminated()方法则会在在Executor变成TIDYING状态后被调用。
这些方法默认都是空实现,我们可以继承ThreadPoolExecutor并重写这些方法来对线程池中线程的运行状态进行了监控,比如监控任务的平均执行时间、最大执行时间和最小执行时间等。或者实现实现其他扩展功能。
钩子方法:
protected void beforeExecute(Thread t, Runnable r)
t - 将运行任务 r 的线程。
r - 将执行的任务。
在runWorker方法中每次执行真正线程任务task.run()方法之前都会执行beforeExecute前置方法。此方法由将执行任务 r 的线程 t 调用。如果前置方法执行过程中抛出异常,那么后面的线程任务将不会执行,并且执行线程会被清理,但是后面的processWorkerExit方法有可能会补充新线程
protected void afterExecute(Runnable r, Throwable t)
r - 已经完成的任务。
t - 导致任务终止的异常;如果执行正常结束,则为 null。
在runWorker方法中每次线程任务task.run()方法执行完成后或者抛出异常之后都会执行afterExecute后置方法,此方法由执行任务的线程调用。如果后置方法执行过程中抛出异常,那么执行线程会被清理,但是后面的processWorkerExit方法有可能会补充新线程。
protected void terminated()
在线程池变成TIDYING状态之后,即线程池被终止时,就会执行terminated()方法。terminated()方法执行完毕之后线程池就会变成TERMINATED状态。
钩子方法的使用案例:
/**
* @author lx
*/
public class ThreadPoolHookTest {
public static void main(String[] args) throws InterruptedException {
//自定义线程池,这里就直接采用匿名内部类的方法重写这三个方法了
ExecutorService pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10),
r -> {
System.out.println("线程" + r.hashCode() + "创建");
//线程命名
return new Thread(r, "threadPool" + r.hashCode());
}, new ThreadPoolExecutor.CallerRunsPolicy()) {
/**
* 前置方法
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:" + ((ThreadTask) r).taskName);
//抛出异常测试
//int i = 1 / 0;
}
/**
* 后置方法
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完毕:" + ((ThreadTask) r).taskName);
//抛出异常测试
//int i = 1 / 0;
}
/**
* 终结方法
*/
@Override
protected void terminated() {
System.out.println("----线程池退出----");
//抛出异常测试
//int i = 1 / 0;
}
};
for (int i = 0; i < 10; i++) {
pool.execute(new ThreadTask("Task" + i));
}
Thread.sleep(1000);
pool.shutdown();
}
static class ThreadTask implements Runnable {
private String taskName;
ThreadTask(String name) {
taskName = name;
}
@Override
public void run() {
//输出执行线程的名称
System.out.println("TaskName" + taskName + "---ThreadName:" + Thread.currentThread().getName());
}
}
}
线程池信息获取
ThreadPoolExecutor的线程池实现提供了许多可以获取线程池核心信息的方法,方便线程池进行实时监控。这些方法基本都需要获取mainLock锁,因此可能会减缓线程池核心功能的效率。
awaitTermination等待终止
awaitTermination(long timeOut, TimeUnit unit)
指定时间内等待线程池被彻底终止(TERMINATED状态),一般和shutdown连用,用于追踪所有任务是否执行完毕。
如果指定时间范围内线程池被彻底终止(TERMINATED状态),那么返回true;否则返回false。
在进行awaitNanos限时等待时,如果一开始就是中断状态或者如果在signal()或signalAll()方法调用之前就因为中断而被唤醒,那么抛出InterruptedException异常。
/**
* 指定时间内等待线程池被彻底终止(TERMINATED状态),一般和shutdown连用,用于追踪所有任务是否执行完毕。
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 如果指定时间范围内线程池被彻底终止(TERMINATED状态),那么返回true;否则返回false。
* @throws InterruptedException 在进行awaitNanos时,如果一开始就是中断状态
* 或者如果在signal()或signalAll()方法调用之前就因为中断而被唤醒,那么抛出异常
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
//计算超时时间纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
/*开启一个死循环,等待状态满足或者超时或者被中断*/
for (; ; ) {
//如果线程池状态是TERMINATED状态,那么直接返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
//否则,可能需要超时等待
//如果剩余超时时间纳秒小于等于0,表示超时时间到了,那么直接返回false
if (nanos <= 0)
return false;
//到这一步,表示还可以继续超时等待,那么当前线程在termination条件变量上超时等待nanos纳秒
nanos = termination.awaitNanos(nanos);
//被唤醒之后继续下一次循环或者抛出异常
}
} finally {
//解锁
mainLock.unlock();
}
}
getTaskCount计划任务数量
public long getTaskCount()
返回已计划执行的任务的总数,返回的值只是一个近似值。返回的值为completedTaskCount + 每一个Worker的completedTasks + 正在执行的任务数量 + 任务队列大小。
/**
* 返回已计划执行的任务的总数。返回的值只是一个近似值。
* 返回的值为:completedTaskCount + 每一个Worker的completedTasks + 任务队列大小
*
* @return completedTaskCount + 每一个Worker的completedTasks + 任务队列大小
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//初始化计数器n,初始值为completedTaskCount
long n = completedTaskCount;
//遍历workers集合
for (Worker w : workers) {
//加上每一个Worker内部的执行任务的计数
n += w.completedTasks;
//如果该Worker被锁定了,说明正在执行任务,那么再自增1
if (w.isLocked())
++n;
}
//最后的返回值还要加上任务队列中计划执行的任务数量
return n + workQueue.size();
} finally {
//解锁
mainLock.unlock();
}
}
getActiveCount工作线程数量
public int getActiveCount()
返回正在执行任务的线程的近似数量。统计方式很简单,就是遍历workers队列,如果有Worker被锁定了,那么一定在执行任务。
/**
* @return 返回正在执行任务的线程的近似数量
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//初始化计数器
int n = 0;
//遍历workers集合
for (Worker w : workers)
//如果该Worker被锁定了,说明正在执行任务,那么n自增1
if (w.isLocked())
++n;
//返回n
return n;
} finally {
//解锁
mainLock.unlock();
}
}
getPoolSize当前线程数
public int getPoolSize()
返回池中的当前线程数。如果线程池状态大于等于TIDYING,表示线程池被彻底终止了,那么直接返回0,否则返回workers集合的数量即可。
/**
* @return 返回池中的当前线程数
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//如果运行状态大于等于TIDYING,即被终止状态,那么返回0
//否则返回workers集合的数量
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
//解锁
mainLock.unlock();
}
}
getLargestPoolSize曾经最大线程数
public int getLargestPoolSize()
返回曾经同时位于池中的最大线程数。很简单,就是返回largestPoolSize属性。
/**
* @return 返回曾经同时位于池中的最大线程数
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//返回largestPoolSize
return largestPoolSize;
} finally {
//解锁
mainLock.unlock();
}
}
getCompletedTaskCount已完成任务数
public long getCompletedTaskCount()
返回已完成执行的近似任务总数,该值在整个连续调用过程中不会减少。
由于completedTaskCount仅在一个Worker被清理时才会更新,因此completedTaskCount并不准确,还需要统计每一个Worker中的completedTasks。
/**
* @return 返回已完成执行的近似任务总数
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try {
//初始化计数器n,初始值为completedTaskCount,由于completedTaskCount仅在一个Worker被清理时才会更新
//因此completedTaskCount并不准确,还需要统计每一个Worker中的completedTasks
long n = completedTaskCount;
//遍历workers集合
for (Worker w : workers)
//加上每一个Worker内部的执行任务的计数
n += w.completedTasks;
//返回n
return n;
} finally {
//解锁
mainLock.unlock();
}
}
核心参数相关
public int getCorePoolSize()
返回核心线程数。
public int getMaximumPoolSize()
返回允许的最大线程数。
public ThreadFactory getThreadFactory()
返回用于创建新线程的线程工厂。
public RejectedExecutionHandler getRejectedExecutionHandler()
返回拒绝策略。
/**
* @return 返回核心线程数
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* @return 返回允许的最大线程数
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* @return 返回线程工厂
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* @return 返回拒绝策略
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
设置线程池参数
在线程池启动之后,我们也可以动态的的调整某些线程池参数。
setCorePoolSize设置核心线程数
public void setCorePoolSize(int corePoolSize)
设置核心线程数。
如果线程数大于新核心线程数,那么可能需要减少线程数量。此时尝试中断所有空闲的线程,期待减少线程数量;
如果线程数小于等于新核心线程数,并且新核心线程数大于旧核心线程数,那么可能需要补充核心线程。计算差值和任务队列中的任务数量的最小值k,尝试循环增加k个新工作线程,去执行任务队列中的的任务,每增加一个成功之后就要判断任务队列是否为null,如果为null那么就不需要增加了。
/**
* 设置核心线程数
*
* @param corePoolSize 新核心线程数
* @throws IllegalArgumentException 如果 corePoolSize 小于 0
*/
public void setCorePoolSize(int corePoolSize) {
//corePoolSize大小校验
if (corePoolSize < 0)
throw new IllegalArgumentException();
//计算新值和旧值的差delta
int delta = corePoolSize - this.corePoolSize;
//将新核心线程数赋给corePoolSize属性
this.corePoolSize = corePoolSize;
/*
* 如果线程数大于新核心线程数,那么可能需要减少线程数量
*/
if (workerCountOf(ctl.get()) > corePoolSize)
//那么尝试中断所有空闲的线程,期待减少线程数量
interruptIdleWorkers();
/*
* 否则,如果delta大于0,表示新核心线程数大于旧新核心线程数
* 那么可能需要补充核心线程
*/
else if (delta > 0) {
//计算差值和任务队列中的任务数量的最小值k
int k = Math.min(delta, workQueue.size());
//尝试循环增加k个新工作线程,去执行任务队列中的的任务
while (k-- > 0 && addWorker(null, true)) {
//增加一个成功之后就要判断任务队列是否为null,如果为null那么就不需要增加了
if (workQueue.isEmpty())
break;
}
}
}
setMaximumPoolSize设置最大线程数
public void setMaximumPoolSize(int maximumPoolSize)
设置允许的最大线程数。此操作将重写构造方法中设置的任何值。如果新值小于当前线程数,则多余的现有线程将可能在下一次空闲时终止。
如果新的最大值小于等于 0或者小于核心线程数的大小,那么抛出IllegalArgumentException异常。
/**
* 设置允许的最大线程数
*
* @param maximumPoolSize 新最大线程数
* @throws IllegalArgumentException 如果新的最大值小于等于 0,或者小于核心线程数的大小
*/
public void setMaximumPoolSize(int maximumPoolSize) {
//maximumPoolSize大小校验
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
//将新最大线程数赋给maximumPoolSize属性
this.maximumPoolSize = maximumPoolSize;
//如果目前线程数大于新最大线程数
if (workerCountOf(ctl.get()) > maximumPoolSize)
//那么尝试中断所有空闲的线程,期待减少线程数量
interruptIdleWorkers();
}
setKeepAliveTime设置超时时间
public void setKeepAliveTime(long time, TimeUnit unit)
设置Worker线程在被回收之前可以保持最大空闲的时间。如果新超时时间小于旧超时时间,那么尝试中断所有空闲的线程,期待应用新的超时时间。
如果time超时时间小于0,或者超时时间等于0并且允许核心线程应用超时时间。那么抛出IllegalArgumentException。即KeepAliveTime=0和allowsCoreThreadTimeOut=true不可兼得。
/**
* 设置Worker线程在被回收之前可以保持最大空闲的时间。
*
* @param time 新超时时间,值为零将导致多余线程在执行任务后立即终止
* @param unit 时间单位
* @throws IllegalArgumentException 如果超时时间小于0,或者超时时间等于0并且允许核心线程应用超时时间
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
//time的校验
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
//计算新超时时间纳秒值
long keepAliveTime = unit.toNanos(time);
//计算新超时时间和旧超时时间的差delta
long delta = keepAliveTime - this.keepAliveTime;
//将新超时时间赋给keepAliveTime属性
this.keepAliveTime = keepAliveTime;
//如果新超时时间小于旧超时时间
if (delta < 0)
//那么尝试中断所有空闲的线程,期待应用新的超时时间
interruptIdleWorkers();
}
setThreadFactory设置线程工厂
public void setThreadFactory(ThreadFactory threadFactory)
设置用于创建新线程的线程工厂。
如果threadFactory 为 null,那么抛出NullPointerException。
/**
* 设置用于创建新线程的线程工厂
*
* @param threadFactory 新线程工厂
* @throws NullPointerException 如果 threadFactory 为 null
*/
public void setThreadFactory(ThreadFactory threadFactory) {
//threadFactory的null校验
if (threadFactory == null)
throw new NullPointerException();
//将新线程工厂赋给threadFactory属性
this.threadFactory = threadFactory;
}
setRejectedExecutionHandler设置拒绝策略
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
设置新拒绝策略。
如果 handler 为null,那么抛出NullPointerException。
/**
* 设置新拒绝策略
*
* @param handler 新拒绝策略
* @throws NullPointerException 如果 handler 为null
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
//handler的null校验
if (handler == null)
throw new NullPointerException();
//将新拒绝策略赋给handler属性
this.handler = handler;
}
allowCoreThreadTimeOut设置核心线程超时
public void allowCoreThreadTimeOut(boolean value)
设置核心线程是否需要应用超时等待机制,true表示需要,fasle表示不需要,默认false。如果value为true,那么设置成功之后尝试中断所有空闲的线程,期待应用新的超时机制。
如果value为true并且当前保持活动时间不大于 0,那么抛出IllegalArgumentException。即KeepAliveTime=0和allowsCoreThreadTimeOut=true不可兼得。
/**
* 设置核心线程是否需要应用超时等待机制
*
* @param value true 需要 false 不需要
* @throws IllegalArgumentException 如果 value 为 true,并且当前超时时间不大于0
*/
public void allowCoreThreadTimeOut(boolean value) {
//如果 value 为 true,并且当前超时时间不大于0,那么抛出IllegalArgumentException异常
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
//如果value不等于目前的规则
if (value != allowCoreThreadTimeOut) {
//将新负责赋给allowCoreThreadTimeOut属性
allowCoreThreadTimeOut = value;
//如果为true
if (value)
//那么尝试中断所有空闲的线程,期待应用新的超时机制
interruptIdleWorkers();
}
}
ScheduledThreadPoolExecutor 延迟线程池
ScheduledThreadPoolExecutor的概述
public interface ScheduledExecutorService
extends ExecutorService
ScheduledExecutorService是ExecutorService的子接口,对ExecutorService做了扩展,可安排在给定的延迟后运行或定期执行的命令。一般使用其实现类ScheduledThreadPoolExecutor,Executors 类也为ScheduledExecutorService 实现提供了便捷的工厂方法newSingleThreadScheduledExecutor()。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
ScheduledExecutorService的核心实现类之一,它可另行安排在给定的延迟后运行命令,或者定期执行命令。其性能由于Timer,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor则可以启动多个后台线程,同时具有其他额外功能。Timer中一个任务出现异常之后会影响其他任务的执行,但是ScheduledThreadPoolExecutor不会。Timer中一个任务耗时较常会影响其他任务的执行,ScheduledThreadPoolExecutor不会。
继承了ThreadPoolExecutor,因此核心原理都是相同的,但是做了特性化处理,固定使用自己内部实现的DelayedWorkQueue作为无界阻塞延迟队列,类似于DelayQueue,采用小顶堆实现以及通过延迟时间比较大小,每次出队列的都是剩余延迟时间为0的任务,周期任务的原理简单说就是从队列中取任务出来执行一次之后如果发现是周期任务那么继续丢到队列中。由于是无界队列,因此仅使用corePoolSize 线程,maximumPoolSize线程的设置和调整都是没用的。线程任务也是使用自己内部实现的ScheduledFutureTask作为传递进来任务的延迟特性包装。
ScheduledThreadPoolExecutor的重要属性
继承了ThreadPoolExecutor的非私有属性,同时具有自己的一些新属性,这些属性都比较简单,主要是用于控制定时任务和周期任务的执行条件的,详见注释。有趣的是超常的属性名字!
//继承了ThreadPoolExecutor的非私有属性,同时具有自己的一些新属性
/**
* 关闭线程池之后(SHUTDOWN状态)是否继续执行周期任务,true表示是,false表示否,默认false
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* 关闭线程池之后(SHUTDOWN状态)是否继续执行延迟一次性任务,true表示是,false表示否,默认true
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* 执行ScheduledFutureTask.cancel取消任务的时候是否从队列中移除任务,true表示是,false表示否,默认false
*/
private volatile boolean removeOnCancel = false;
/**
* 全局任务序号,当延迟时间相同时,序号小的先出队列,ScheduledFutureTask的sequenceNumber就是从这里获取的
* 使用AtomicLong包装一个volatile long值,保证了原子性和线程安全,保证获取的任务序号不会重复
*/
private static final AtomicLong sequencer = new AtomicLong();
ScheduledFutureTask内部类
ScheduledFutureTask是ScheduledThreadPoolExecutor自己实现的专用延迟任务类型,也作为DelayedWorkQueue任务队列的任务实际类型。
直接继承了FutureTask,具有FutureTask的特性,表示ScheduledFutureTask也是异步执行结果类,同时可以接受Runnable和Callable类型的任务。
直接实现了RunnableScheduledFuture接口,也作为该接口的唯一实现类。该接口具有一个重要的抽象方法isPeriodic,用于判断该任务是否是周期性任务,在DelayedWorkQueue中就是通过该isPeriodic方法判断任务是否是周期性任务的。
RunnableScheduledFuture还继承了ScheduledFuture接口,ScheduledFuture接口表示一种延迟的异步执行结果,而ScheduledFuture接口又继承了Delayed顶级接口,Delayed接口用来描述那些应该在给定延迟时间之后执行的对象,它具有一个重要的抽象方法getDelay,用来返回与此接口关联的对象的剩余延迟时间,在DelayedWorkQueue中线程的阻塞时间就是通过该getDelay方法计算出来的。Delayed则是实现了 接口,它具有一个重要的抽象方法compareTo,用来比较大小,在DelayedWorkQueue中加入任务时就是通过该compareTo方法比较延迟时间长短并排序的。
ScheduledFutureTask 内部还有一个period 属性用来表示任务的类型:period=0, 说明当前任务是一次性的任务,执行完毕后就退出了;period 为负数,说明当前任务为fixed-delay 任务,是固定延迟的定时可重复执行任务,-perid就是scheduleWithFixedDelay方法的delay参数;period 为正数,说明当前任务为fixed-rate 任务, 是固定频率的定时可重复执行任务,period就是scheduleAtFixedRate方法的period 参数。
heapIndex属性记录所在延迟队列(底层数组)位置的索引,用于支持更快的执行cancel取消任务;time属性记录任务延迟时间纳秒;sequenceNumber属性记录任务被添加的序号,用于记录添加的先后顺序,在time相同时,sequenceNumber越小的越先执行。
ScheduledFutureTask内部的属性、构造器,以及getDelay、compareTo、cancel、isPeriodic方法源码如下:
/**
* ScheduledThreadPoolExecutor内部的专用延迟任务
*/
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* 任务被添加的序号,用于记录添加的先后顺序,在time相同时,sequenceNumber越小的越先执行
*/
private final long sequenceNumber;
/**
* 任务延迟执行时间点纳秒
*/
private long time;
/**
* period属性用来表示任务的类型,有以下三种:
* period=0,说明当前任务是一次性的任务,执行完毕后就退出了。
* period 为负数,说明当前任务为fixed-delay 任务,是固定延迟的定时可重复执行任务。
* period 为正数,说明当前任务为fixed-rate 任务, 是固定频率的定时可重复执行任务。
*/
private final long period;
/**
* 要重新执行的任务,仅被reExecutePeriodic方法调用,初始化为this
*/
RunnableScheduledFuture<V> outerTask = this;
/**
* 所在延迟队列(底层数组)位置的索引,用于支持更快的执行cancel取消任务
*/
int heapIndex;
/**
* 创建一个在指定纳秒时间之后执行的一次性任务,具有指定返回结果
*
* @param ns 任务延迟执行时间点纳秒
* @param r 任务
* @param result 指定返回结果
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
//调用父类FutureTask的构造器
super(r, result);
//任务延迟执行时间点纳秒
this.time = ns;
//period设为0
this.period = 0;
//任务序号
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 创建给定的指定纳秒时间之后执行的周期任务,具有指定返回结果
*
* @param r 任务
* @param result 指定返回结果
* @param ns 任务延迟执行时间点纳秒
* @param period 任务类型
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
//调用父类FutureTask的构造器
super(r, result);
//任务延迟执行时间点纳秒
this.time = ns;
this.period = period;
//任务序号
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 创建一个在指定纳秒时间之后执行的一次性任务,具有指定返回结果
*
* @param callable 任务
* @param ns 任务延迟执行时间点纳秒
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
//调用父类FutureTask的构造器
super(callable);
//任务延迟执行时间点纳秒
this.time = ns;
//period设为0
this.period = 0;
//任务序号
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 获取剩余延迟时间
*
* @param unit 时间单位
* @return 剩余延迟时间
*/
public long getDelay(TimeUnit unit) {
//任务延迟执行时间点纳秒减去当前时间点纳秒,大于0表示还有延迟时间少于等于0则表示可以执行
return unit.convert(time - now(), NANOSECONDS);
}
/**
* 比较大小,即比较任务延迟执行时间点纳秒time以及任务序号sequenceNumber
*
* @param other 队列中的其他任务
* @return 0 相等; -1 当前任务先被执行; 1 当前任务后被执行
*/
public int compareTo(Delayed other) {
//如果就是当前任务,那么返回0
if (other == this) // compare zero if same object
return 0;
//如果不是同一个任务且属于ScheduledFutureTask类型,那么继续比较
if (other instanceof ScheduledFutureTask) {
//转换为ScheduledFutureTask类型
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
//获取当前任务延迟执行时间点纳秒和指定任务的对应值的差diff
long diff = time - x.time;
//diff小于0,说明当前任务延迟执行时间点纳秒更小,更先被执行,返回-1
if (diff < 0)
return -1;
//diff大于0,说明当前任务延迟执行时间点纳秒更大,更后被执行,返回1
else if (diff > 0)
return 1;
//diff等于0,比较sequenceNumber,如果当前序号更小,那么更先被执行,返回-1
else if (sequenceNumber < x.sequenceNumber)
return -1;
//否则更后被执行,返回1
else
return 1;
}
//其它类型,那么比较getDelay方法的返回值
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
/**
* 尝试取消任务
*
* @param mayInterruptIfRunning 如果应该中断正在执行此任务的线程,则为 true;否则允许正在运行的任务运行完成
* @return 如果无法取消任务,则返回 false,这通常是由于它已经正常完成或者已取消;否则返回 true
* 返回true也不代表任务方法没有执行完毕,有可能任务执行完了,只是不能获取返回值而已。
*/
public boolean cancel(boolean mayInterruptIfRunning) {
//调用父类的cancel方法,返回cancelled,即任务状态改变了
boolean cancelled = super.cancel(mayInterruptIfRunning);
//如果 cancelled为true,表示尝试取消成功
//并且 removeOnCancel为true,removeOnCancel属性表示取消任务的时候是否从队列中移除任务,true表示移除
//并且 当前任务的heapIndex大于等于
if (cancelled && removeOnCancel && heapIndex >= 0)
//那么从任务队列中移除任务
remove(this);
//返回父类方法的调用结果cancelled
return cancelled;
}
/**
* 是否是周期性任务
*
* @return true 是 false 否
*/
public boolean isPeriodic() {
//如果period不为0,那么就是周期任务
return period != 0;
}
//其他方法后面讲
}
DelayedWorkQueue内部类
DelayedWorkQueue是ScheduledThreadPoolExecutor自己实现的专用延迟无界阻塞任务队列,内部同样采用数组作为存放任务的容器,逻辑上实现小顶堆,这完全可以类比DelayQueue 和PriorityBlockingQueue,区别是每一个元素(任务)会将自己在数组中的索引记录在自己内部heapIndex属性中,这消除了在取消某个任务时查找任务在数组中位置的过程(从O(log n)到O(1)),大大加快了删除速度。
所有堆操作都必须记录索引更改,主要在 siftUp 和 siftDown 方法中。删除任务后,任务的堆索引设置为 -1。需要注意的是,计划未来任务最多可以在队列中出现一次(对于其他类型的任务或工作队列这是不需要的),因此需要由heapIndex 进行唯一标识。
DelayedWorkQueue对内部任务ScheduledFutureTask的time属性进行小顶堆排序,即延迟时间,而如果两个任务的time延迟执行时间相同,那么先提交的任务将被先执行(比较sequenceNumber,越小说明越先被提交,该属性不会重复)。
从下面的内部属性可以看到,其原理和DelayQueue基本一致,在此原理不在赘述!
/**
* ScheduledThreadPoolExecutor内部的专用延迟无界阻塞任务队列
*/
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
/**
* 初始容量
*/
private static final int INITIAL_CAPACITY = 16;
/**
* 底层数组,初始容量为16
*/
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
/**
* lock用于保证线程安全,生产和消费都需要获取同一个锁,创建对象实例的时候就初始化
*/
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
/**
* 类似于DelayQueue的Leader-Follower线程模型
* 成为leader的线程将会在available条件变量上等待此时队列头结点的剩余延迟时间
* 其他线程作为Follower在available条件变量上一直等待,直到被唤醒或中断
* leader线程苏醒之后会将leader变量置空,在获取到元素之后最后会唤醒一个在available上等待的Follower线程
* Leader-Follower线程模型可以避免没有必要的自旋或者没必要的唤醒
*/
private Thread leader = null;
/**
* 一个条件变量available,用于消费者线程的等待和唤醒,创建对象实例的时候就初始化
* 生产线程不会等待,因为队列是“无界”的,可以一直入队。
*/
private final Condition available = lock.newCondition();
}
ScheduledThreadPoolExecutor的构造器
public ScheduledThreadPoolExecutor(int corePoolSize)
使用给定corePoolSize创建一个新 ScheduledThreadPoolExecutor。
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory)
使用给定的corePoolSize和threadFactory创建一个新 ScheduledThreadPoolExecutor。
public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler)
使用给定corePoolSize和handler创建一个新 ScheduledThreadPoolExecutor。
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler)
使用给定corePoolSize和threadFactory和handler创建一个新 ScheduledThreadPoolExecutor。
上面的构造器最终都是调了父类的构造器,可以发现,只能指定corePoolSize和threadFactory和handler这三个参数,其他的参数都是ScheduledThreadPoolExecutor帮我们指定的,虽然还是可以在后面使用相关方法修改,但是建议不要轻易去这么做!
默认maximumPoolSize为Integer.MAX_VALU,默认超时时间为0,默认阻塞队列为DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
schedule一次性任务
public < V > ScheduledFuture< V > schedule(Callable< V > callable, long delay, TimeUnit unit)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
command和callable都表示线程任务,delay表示从现在开始延迟执行的时间,unit表示延迟参数的时间单位。schedule方法创建并执行在给定延迟时间后启用的一次性任务操作,返回一个ScheduledFuture延时异步结果对象,实际类型就是队伍对应的ScheduledFutureTask。
另外,execute和submit系列方法都被重写为内部调用schedule方法,并且delay都为0,即不需要延迟执行的一次性任务。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
两个schedule方法,一个传递Runnable一个传递Callable,Runnable会被Executors.callable包装成为Callable,除此之外它们的源码一致。
/**
* @param command Runnable类型的任务
* @param delay 从现在开始延迟执行的时间
* @param unit 时间单位
* @return 与任务关联的ScheduledFutureTask对象
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//callable和unit的null校验
if (command == null || unit == null)
throw new NullPointerException();
//调用decorateTask对任务进行控制,可由子类重写
//默认返回一个新建的ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 修改或替换用于执行可调用的任务。此方法可用于子类重写用于管理内部任务的具体类。 默认实现仅返回给定的任务。
*
* @param runnable 提交的线程任务
* @param task 延迟异步执行结果
* @return 延迟异步执行结果
* @since 1.6
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
/**
* @param callable Runnable类型的任务
* @param delay 从现在开始延迟执行的时间
* @param unit 时间单位
* @return 与任务关联的ScheduledFutureTask对象
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
//callable和unit的null校验
if (callable == null || unit == null)
throw new NullPointerException();
//调用decorateTask对任务进行控制,可由子类重写
//默认返回一个新建的ScheduledFutureTask
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 修改或替换用于执行可调用的任务。此方法可用于子类重写用于管理内部任务的具体类。 默认实现仅返回给定的任务。
*
* @param callable 提交的线程任务
* @param task 延迟异步执行结果
* @return 延迟异步执行结果
* @since 1.6
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
默认情况下,传入的Runnable和Callable都会被包装成为一个ScheduledFutureTask,那么在线程池中执行的run方法就是ScheduledFutureTask的run方法。
triggerTime任务触发时间点
在调用ScheduledFutureTask构造器的时候,调用了一个triggerTime方法,该方法从当前时间点开始,根据延迟时间和时间单位返回延迟操作的触发时间点纳秒,就是计算time属性的值!
有意思的是,队头任务(最近将被执行的任务)和将要新提交的任务的执行间隔时间不能超过long类型的最大值,否则在比较延迟时间长短的时候会由于数值溢出而出现问题,因此这里实际设置的延迟时间不一定是参数中的时间!
/**
* 从当前时间点开始
* 根据延迟时间和时间单位返回延迟操作的触发时间点纳秒
*
* @param delay 延迟时间
* @param unit 时间单位
* @return 延迟操作的触发时间点纳秒
*/
private long triggerTime(long delay, TimeUnit unit) {
//将延迟时间转换为纳秒值,调用另一个triggerTime方法,
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* 从当前时间点开始
* 根据延迟时间纳秒返回延迟操作的触发时间点纳秒
*
* @param delay 延迟时间纳秒
* @return 延迟操作的触发时间点纳秒
*/
long triggerTime(long delay) {
//now()返回当前时间纳秒
//如果delay小于long类型最大值的一半,那么就是 当前时间纳秒 + 延迟时间纳秒
//否则 将队列中所有任务延迟的最大差值约束在Long.MAX_VALUE,以避免在比较中溢出。
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 返回当前时间点纳秒
*/
final long now() {
return System.nanoTime();
}
/**
1. 将队列中所有任务延迟的最大差值约束在Long.MAX_VALUE,以避免在比较中溢出。
*/
private long overflowFree(long delay) {
//获取但不移除队列头结点head
Delayed head = (Delayed) super.getQueue().peek();
//如果head不为null,说明有任务
if (head != null) {
//获取head的剩余超时时间纳秒headDelay
long headDelay = head.getDelay(NANOSECONDS);
//如果headDelay小于0表示队头任务已过期但是并没有执行
//并且delay-headDelay小于0,这说明队头和队尾结点(当前新增任务)的延迟时间差值已经超出了long类型的范围
if (headDelay < 0 && (delay - headDelay < 0))
//那么delay重设置为Long.MAX_VALUE + headDelay,这样保证队头和队尾任务间隔时间在long类型范围之类
delay = Long.MAX_VALUE + headDelay;
}
//如果head为null,直接返回原值
return delay;
}
delayedExecute延迟/定期执行核心方法
schedule方法最后都调用了delayedExecute方法,该方法就是用于延迟或定期执行任务的方法,也是ScheduledThreadPoolExecutor的核心方法。
大概步骤为:
- 如果线程池已关闭(非RUNNING状态),直接执行拒绝策略;
- 否则,尝试执行:
- 首先将task任务通过DelayedWorkQueue的add方法加入到阻塞队列。
- 加入队列之后,继续判断。如果线程池已关闭(非RUNNING状态),并且当前状态不能继续运行,那么尝试从队列移除task,如果这个三个条件判断都满足,那么调用cancel取消任务;
- 否则,说明该任务支持执行。那么调用ensurePrestart确保至少有一条线程,能够执行任务。
/**
* 延迟或定期任务的主要执行方法。
* 如果池已关闭,则拒绝该任务。否则,将任务添加到队列中,并启动线程(如有必要)以运行它。
* (我们无法预启动线程以运行任务,因为任务(可能)尚不应运行。 如果在添加任务时关闭池,则取消并删除它(如果状态和运行后关闭参数)。
*
* @param task 被包装的RunnableScheduledFuture任务
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
/*如果线程池已关闭(非RUNNING状态),直接执行拒绝策略*/
if (isShutdown())
//执行拒绝策略
reject(task);
/*否则,尝试执行*/
else {
/*
* 首先将task任务通过DelayedWorkQueue的add方法加入到阻塞队列,该方法就是通过新元素构建小顶堆的逻辑,以及Leader-Follower线程模型的应用。
* 关于小顶堆以及以及Leader-Follower线程模型,我们以前在PriorityBlockingQueue和DelayQueue中就讲过了,远离都差不多,在此不再赘述。
* 需要和注意的是,在最终构建完成之后,这个task以及其他移动了位置的task还会保存或者更新自己在数组中的索引位置,方便后续查找移除
*/
super.getQueue().add(task);
/*
* 加入队列之后,继续判断
* 如果线程池已关闭(非RUNNING状态),并且当前状态不能继续运行,那么尝试从队列移除task成功
* 三个条件判断都满足,那么调用cancel取消任务
*/
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//移除task成功之后,尝试调用task本身的cancel方法取消这个任务
task.cancel(false);
/*
* 否则,说明该任务支持执行,那么调用ensurePrestart确保至少有一条线程,能够执行任务
*/
else
ensurePrestart();
}
}
canRunInCurrentRunState是否可执行
canRunInCurrentRunState判断某个任务在关闭线程池(SHUTDOWN状态)之后是否还可以继续运行。周期任务是通过continueExistingPeriodicTasksAfterShutdown属性控制的,而一次性任务是通过executeExistingDelayedTasksAfterShutdown属性控制的。
/**
* 判断某个任务在关闭线程池(SHUTDOWN状态)之后是否还可以继续运行
*
* @param periodic 如果此任务是周期性任务,则为 true;一次性任务则为false
* @return true 应该 false 不应该
*/
boolean canRunInCurrentRunState(boolean periodic) {
//调用isRunningOrShutdown方法
// 如果periodic为true,那么传递continueExistingPeriodicTasksAfterShutdown属性
// 如果periodic为false,那么传递executeExistingDelayedTasksAfterShutdown属性
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
/**
* 该任务是否可以继续运行
*
* @param shutdownOK 如果在SHUTDOWN状态还可以运行,则为true,否则就是false
* @return true 应该 false 不应该
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
//获取线程池状态
int rs = runStateOf(ctl.get());
//如果是RUNNING,或者是SHUTDOWN并shutdownOK为true,那么返回true
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
ensurePrestart确保Worker数量
在确定可以执行给定的任务之后,最后会调用ensurePrestart方法,该方法确保线程池中开启一个工作线程,即使corePoolSize=0,用于保证任务能够被执行。
有趣的是,该方法被实现在父类ThreadPoolExecutor中,但是在JDK1.8中只有子类ScheduledThreadPoolExecutor调用。
/**
1. 该方法是父类ThreadPoolExecutor中的方法,和prestartCoreThread方法类似,但是又有区别
2. 尝试启动一个核心线程,使其处于等待工作的空闲状态或者去队列执行任务。
3. 如果已启动所有核心线程,此方法不启动。如果核心线程数被设置为0,那么还是要启动一条线程
*/
void ensurePrestart() {
//当前线程数wc
int wc = workerCountOf(ctl.get());
//如果小于corePoolSize,那么调用父类addWorker方法启动一条个核心线程
if (wc < corePoolSize)
addWorker(null, true);
/*否则,如果wc==0,即核心线程被设置为0,还是需要调用addWorker启动一条线程*/
else if (wc == 0)
addWorker(null, false);
}
ScheduledFutureTask.run执行任务
在ScheduledThreadPoolExecutor的runWorker方法中调用的task.run()实际上就是调用的ScheduledFutureTask的run方法。该方法就是执行具体任务以及实现周期性控制的核心方法。
run方法实际上比较简单,大概步骤为:
使用periodic表示当前任务是否是周期性任务,true 是 false 否;
在执行真正的任务之前,判断当前任务是否可以执行,如果不能执行,那么cancel取消该任务。
否则,表示可以执行。继续判断是否是一次性任务。如果是一次性任务,那么调用ScheduledFutureTask的父类FutureTask的run方法执行任务而在FutureTask的run方法中,有会调用c.call()方法,这里面才是我们自己写的任务逻辑。run方法完毕则任务结束。
否则,表示周期性任务。那么调用父类FutureTask的runAndReset方法执行任务并且执行完毕之后重置任务状态。执行并重置失败之后,退出run方法,该任务将不再执行;执行并重置成功之后,进入if代码块:
调用setNextRunTime设置任务下一次要执行的时间点。
调用reExecutePeriodic将下一次待执行的任务放置到DelayedWorkQueue中,等待下一次执行,这个outerTask默认指向该任务自己。
/**
* ScheduledFutureTask的run方法
*/
public void run() {
//periodic表示当前任务是否是周期性任务,true 是 false 否
boolean periodic = isPeriodic();
/*
* 在执行真正的任务之前,判断当前任务是否可以执行
* 如果不能执行,那么cancel取消该任务
*/
if (!canRunInCurrentRunState(periodic))
cancel(false);
/*
* 否则,表示可以执行。继续判断是否是一次性任务
* 如果是一次性任务,那么调用ScheduledFutureTask的父类FutureTask的run方法执行任务
* 而在FutureTask的run方法中,有会调用c.call()方法,这里面才是我们自己写的任务逻辑
*/
else if (!periodic)
ScheduledFutureTask.super.run();
/*
* 否则,表示周期性任务。
* 那么调用父类FutureTask的runAndReset方法执行任务并且执行完毕之后重置任务
*/
else if (ScheduledFutureTask.super.runAndReset()) {
//执行并重置成功之后,设置任务下一次要执行的时间点
setNextRunTime();
//下一次待执行的任务放置到DelayedWorkQueue中,这个outerTask默认指向该任务自己
reExecutePeriodic(outerTask);
}
}
runAndReset运行并重置任务
如果是一次性任务,那么调用父类FutureTask的run方法即可,执行完毕之后该任务就没了;如果是周期任务,那么调用父类FutureTask的runAndReset方法执行任务并且执行完毕之后重置任务。
runAndReset和我们前面讲的run方法非常相似,区别就是不会设置结果(正确的结果)。在最后会判断如果状态还是NEW,那么说明“重置”成功,实际上是在该方法的代码中根本就没有改变过状态值,这里是为了防止被取消的逻辑。那么现在我们能够明白,这里所谓的“重置”,那就是任务状态一直是NEW而已,即没有改变任务状态的时候,任务就可以重复运行!
在任务抛出异常的时候,会设置异常结果值,并且该周期方法将不再执行!
/**
* 在不设置结果的情况下执行计算,然后将此将来重置为初始状态,如果计算遇到异常或被取消,则无法继续执行。
* 类似于run,多了reset逻辑
*
* @return 如果计算-重置成功,则返回true,否则返回false
*/
protected boolean runAndReset() {
/*
* 如果state不是NEW状态
* 或者 state是NEW状态,但是CAS的将runner从null设置为当前调用线程失败
*
* 以上两个条件满足一个就立即返回,任务不会被执行,即要求任务在此前没有被执行过才能开始执行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
//直接返回false
return false;
//ran用来保存任务是否执行成功
boolean ran = false;
//s保存此时的state值
int s = state;
try {
//保存要执行的任务c
Callable<V> c = callable;
//如果c不为null,并且state还是为NEW,那么可以执行真正的任务
//这里还需要校验一下,因为在上次校验到此之间可能存在其他线程取消了任务
//如果被取消了,那么不会执行底层真正的任务
if (c != null && s == NEW) {
try {
//调用c.call()方法,这里才是执行真正的任务,这个call方法中的代码就是我们自己编写的代码
//如果call执行成功,那么也不会设置返回值result
c.call(); // don't set result
//ran置为true,表示执行成功
ran = true;
} catch (Throwable ex) {
//如果call方法抛出异常,调用setException方法,用于设置异常的返回值,并唤醒因为get阻塞的线程
setException(ex);
}
}
} finally {
//执行完毕后在finally中,将runner设置为null,到此表示任务执行完毕
//在任务执行过程中runner必须一直非null,用来保证run()方法不会被并发的调用
runner = null;
//重新获取此时的state,因为在任务执行过程中可能被中断,state会被改变
s = state;
/*
* 如果状态大于等于INTERRUPTING,表示任务执行过程中其他线程执行了cancel(true)成功
* 那么本次任务执行的setException以及set方法都执行失败,但是此执行线程不一定被中断了
* 可能执行线程在执行任务call()方法完毕时,cancel的线程更改了state值为INTERRUPTING,这将导致执行线程的setException或者set失败
*
* 随后执行线程将runner置空,那么cancel线程在后续代码中由于runner为null将不会中断执行线程
* 但是也有可能cancel线程先获取到了runner,此时runner还不为null,但是执行线程实际上将任务执行完了,此时还是会中断线程
*/
if (s >= INTERRUPTING)
//处理中断,等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断
handlePossibleCancellationInterrupt(s);
}
//如果ran为true,即任务执行成功
//并且任务状态还是为NEW,表示重置成功(实际上是在该方法的代码中根本就没有改变过状态值,这里是为了防止被取消的逻辑),那么就返回true
return ran && s == NEW;
}
setNextRunTime设置下次运行时间点
在周期任务执行并重置完毕之后,会设置周期任务下一次运行的时间点。scheduleAtFixedRate和scheduleWithFixedDelay方法内部会调用到该方法,许多人分不清这两个方法的区别,实际上该方法的源码已经明确指出了它们的区别!
/**
* ScheduledFutureTask中的方法
* scheduleAtFixedRate和scheduleWithFixedDelay方法内部会调用到该方法
* 设置周期任务下一次运行的时间点,这里就是这两个方法的区别
*/
private void setNextRunTime() {
//获取period
long p = period;
/*
* 如果p>0,说明当前任务为fixed-rate 任务,是固定频率的定时可重复执行任务。
* p就是scheduleAtFixedRate (Runnable command, long initialDelay, long period, TimeUnit unit)方法传递的period参数
* p代表 前一个任务开始之后,每隔 period 时间之后重复执行
* 如果一次任务执行时间小于period计划时间,那么任务开始间隔就是initialDelay、initialDelay+period、initialDelay + 2 * period……。
* 如果一次任务执行时间大于等于period计划时间,下一次任务执行将在当前任务执行结束之后立即执行。
* fixed-rate 任务在同一时间不会有多个线程同时执行。
*/
if (p > 0)
//可以看到,下一次任务的执行时间点就是基于 上一次任务开始执行时间向后延迟p
time += p;
/*
* 否则,p<0,说明当前任务为fixed-delay 任务,是固定延迟的定时可重复执行任务。
* -p就是scheduleWithFixedDelay (Runnable, long initialDelay, long delay, TimeUnit timeunit)方法传递的delay参数
* -p(delay)代表 前一个任务执行的结束和下一个执行的开始之间的间隔(fixed-delay 任务)。
* 计划任务在同一时间不会有多个线程同时执行
*
* fixed-delay 任务在同一时间不会有多个线程同时执行。
*/
else
//可以看到,下一次任务的执行时间点就是基于 当前时间(任务执行完毕时间)向后延迟-p
time = triggerTime(-p);
}
reExecutePeriodic重新提交周期任务
在周期任务执行并重置完毕并且设置好下一次运行的时间点之后,最后的步骤就是再次把任务丢到任务队列中去。类似于delayedExecute方法,重新执行周期任务,区别是不会再断度判断isShutdown或者执行拒绝策略。
/**
* ScheduledThreadPoolExecutor 中的方法
* 类似于delayedExecute方法,重新执行周期任务,区别是不会再断度判断isShutdown或者执行拒绝策略
*
* @param task the task
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//如果当前状态可以执行该周期任务
if (canRunInCurrentRunState(true)) {
//将该任务添加到任务队列
super.getQueue().add(task);
/*
* 如果当前不支持执行该周期任务,并且从队列移除该任务成功
* 两个条件都满足,那么调用cancel取消任务
*/
if (!canRunInCurrentRunState(true) && remove(task))
//移除task成功之后,尝试调用task本身的cancel方法取消这个任务
task.cancel(false);
/*
* 否则,说明该任务支持执行,那么调用ensurePrestart确保至少有一条线程,能够执行任务
*/
else
ensurePrestart();
}
}
测试案例
使用工具类创建内置5个核心线程的ScheduledExecutorService,随后调用schedule() 方法传递一个任务,后边的两个参数定义了这个任务将在5秒钟之后被执行。
/**
* @author lx
*/
public class ScheduledThreadPoolExecutorTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5);
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
System.out.println("Executed!");
return "Called";
}, 5, TimeUnit.SECONDS);
System.out.println(scheduledFuture.get());
scheduledExecutorService.shutdown();
}
}
scheduleWithFixedDelay固定周期任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
command:表示要执行的Runnable类型的任务;
initialDelay:表示提交任务后延迟多少时间开始执行command;
delay:表示当任务执行完毕后延长多少时间后再次运行command;
unit:表示initialDelay 和delay 的时间单位。
该任务将会在首个initialDelay时间延迟之后得到执行,然后在前一个任务开始结束后,尝试每隔 delay时间之后重复执行,直到任务运行中抛出了异常,被取消了,或者关闭了线程池(SHUTDOWN状态之后)。
该方法中delay作为前一个任务执行结束和下一个任务执行的开始之间的间隔。即如果1秒后开始执行第一次任务,任务耗时5秒,任务间隔时间3秒,那么第二次任务执行的时间是在第10秒开始。周期任务在同一时间不会有多个线程同时执行。
关于周期性控制的源码,在上面的setNextRunTime部分。
/**
* @param command 表示要执行的Runnable类型的任务
* @param initialDelay 表示提交任务后延迟多少时间开始执行command
* @param delay 表示当任务执行完毕后延长多少时间后再次运行command
* @param unit 表示initialDelay 和delay 的时间单位
* @return 与任务关联的ScheduledFutureTask对象
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
//callable和unit的null校验
if (command == null || unit == null)
throw new NullPointerException();
//如果delay小于等于0,那么抛出IllegalArgumentException异常
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
//注意这里是-delay,是负数
unit.toNanos(-delay));
//调用decorateTask对任务进行控制,可由子类重写
//默认返回一个新建的ScheduledFutureTask
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//指向t,默认就是指向自己
sft.outerTask = t;
//调用同一个方法
delayedExecute(t);
return t;
}
测试案例
/**
* @author lx
*/
public class ScheduleWithFixedDelayTest {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
System.out.println("测试scheduleWithFixedDelay");
System.out.println("---首次延迟1秒执行后,前一个任务结束3秒后再次执行,任务耗时1秒---" + Calendar.getInstance().get(Calendar.SECOND) + "\r\n");
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("---开始执行---" + Calendar.getInstance().get(Calendar.SECOND));
//假设任务耗时1秒
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("---执行完毕---" + Calendar.getInstance().get(Calendar.SECOND) + "\r\n");
}, 1, 3, TimeUnit.SECONDS);
}
}
scheduleAtFixedRate固定频率任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
command:表示要执行的Runnable类型的任务;
initialDelay :表示提交任务后延迟多少时间开始执行command;
period:表示连续执行之间的周期;
unit :表示initialDelay 和period的时间单位。
该任务将会在首个initialDelay时间延迟之后得到执行,然后在前一个任务开始之后,尝试每隔 period 时间之后重复执行,直到任务运行中抛出了异常,被取消了,或者关闭了线程池(SHUTDOWN状态之后)。
如果任务执行时间小于period计划时间,那么任务开始间隔就是initialDelay、initialDelay+period、initialDelay + 2 * period……。如果任务执行时间大于等于period计划时间,下一次执行将在当前执行结束执行之后才会立即执行。周期任务在同一时间不会有多个线程同时执行。
关于周期性控制的源码,在上面的setNextRunTime部分。
/**
* @param command 表示要执行的Runnable类型的任务
* @param initialDelay 表示提交任务后延迟多少时间开始执行command
* @param period 表示连续执行之间的周期
* @param unit 表示initialDelay 和period的时间单位
* @return 与任务关联的ScheduledFutureTask对象
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//callable和unit的null校验
if (command == null || unit == null)
throw new NullPointerException();
//如果period小于等于0,那么抛出IllegalArgumentException异常
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
//这里是period,是正数
unit.toNanos(period));
//调用decorateTask对任务进行控制,可由子类重写
//默认返回一个新建的ScheduledFutureTask
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//指向t,默认就是指向自己
sft.outerTask = t;
//调用同一个方法
delayedExecute(t);
return t;
}
测试案例
/**
* @author lx
*/
public class ScheduleAtFixedRateTest {
static class ScheduleAtFixedRate1 {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
System.out.println("测试scheduleAtFixedRate,此时任务执行时间小于period计划时间");
System.out.println("---首次延迟1秒执行后,每三秒执行一次,任务耗时1秒---" + Calendar.getInstance().get(Calendar.SECOND) + "\r\n");
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("---开始执行---" + Calendar.getInstance().get(Calendar.SECOND));
//假设任务耗时1秒
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("---执行完毕---" + Calendar.getInstance().get(Calendar.SECOND) + "\r\n");
}, 1, 3, TimeUnit.SECONDS);
}
}
static class ScheduleAtFixedRate2 {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
System.out.println("测试scheduleAtFixedRate,此时任务执行时间大于period计划时间");
System.out.println("---首次延迟1秒执行后,每三秒执行一次,任务耗时4秒---" + Calendar.getInstance().get(Calendar.SECOND) + "\r\n");
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("---开始执行---" + Calendar.getInstance().get(Calendar.SECOND));
//假设任务耗时4秒
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(4));
System.out.println("---执行完毕---" + Calendar.getInstance().get(Calendar.SECOND) + "\r\n");
}, 1, 3, TimeUnit.SECONDS);
}
}
}
设置线程池参数
除了父类ThreadPoolExecutor的一系列方法之外,还有自己的两个方法:
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
关闭线程池之后(SHUTDOWN状态)是否继续执行周期任务,true表示是,false表示否,默认false。
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
关闭线程池之后(SHUTDOWN状态)是否继续执行延迟一次性任务,true表示是,false表示否。此值默认为 true。
/**
* 关闭线程池之后(SHUTDOWN状态)是否继续执行周期任务,默认false。
*
* @param value true表示是,false表示否
*/
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
//设置值
continueExistingPeriodicTasksAfterShutdown = value;
//如果value为false 并且线程池非RUNNING状态
if (!value && isShutdown())
onShutdown();
}
/**
* 关闭线程池之后(SHUTDOWN状态)是否继续执行延迟一次性任务,此值默认为 true。
*
* @param value true表示是,false表示否
*/
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
//设置值
executeExistingDelayedTasksAfterShutdown = value;
//如果value为false 并且线程池非RUNNING状态
if (!value && isShutdown())
//取消并清除由于关闭策略而不应运行的所有队列中的任务。
onShutdown();
}
/**
* @return 线程池是不是RUNNING状态,true 不是RUNNING状态,false 是RUNNING状态
*/
public boolean isShutdown() {
return !isRunning(ctl.get());
}
/**
* 在shutdown方法中线程池变成SHUTDOWN之后被调用,或者改变两个取消策略的方法中被调用
* 取消并清除由于关闭策略而不应运行的所有队列中的符号位情况的任务。
*/
@Override
void onShutdown() {
//获取任务队列
BlockingQueue<Runnable> q = super.getQueue();
//获取关闭线程池之后(SHUTDOWN状态)是否继续执行延迟一次性任务的值keepDelayed
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
//获取关闭线程池之后(SHUTDOWN状态)是否继续执行周期任务的值keepDelayed
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
/*如果两个都是false,那么全部任务队列中的任务都取消并移除*/
if (!keepDelayed && !keepPeriodic) {
/*遍历任务队列快照*/
for (Object e : q.toArray())
//cancel取消任务
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
//清理整个队列
q.clear();
}
/*否则,那么任务队列中的任务根据实际情况取消并移除*/
else {
// Traverse snapshot to avoid iterator exceptions
/*遍历任务队列快照*/
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>) e;
//t是否是周期任务,如果是那么!keepPeriodic,如果不是那么!keepDelayed
//或者 t是否已经被取消了,那么仍然尝试移除任务队列
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
//如果移除队列成功
if (q.remove(t))
//cancel取消任务
t.cancel(false);
}
}
}
}
//父类的方法,尝试彻底终止线程池
tryTerminate();
}
Executors 线程池工厂
Executors可以看作一个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的内置线程池实例,或者返回线程工厂,或者将runnable转换为callable。
《阿里巴巴java开发手册》中不推荐使用Executors创建线程池,有这样的强制编程规约:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SignalThreadPool : 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
内置线程池
Executors提供了四种默认线程池实现!
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads)
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize与maximumPoolSize相等,即最大线程数就是核心线程数;keepAliveTime = 0 该参数默认对核心线程无效,因此不会超时,在线程池被关闭之前,池中被创建的线程将一直存在。
workQueue为LinkedBlockingQueue,是一个无界阻塞队列,队列容量为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列堆积大量任务,很有可能在执行拒绝策略之前就造成内存溢出。
适用于持续不断地提交任务的场景,并且要求任务提交速度不得超过线程处理速度。
newCachedThreadPool
public static ExecutorService newCachedThreadPool()
创建对所有线程都带有超时时间的线程池。对于执行很多短期异步任务的程序而言,这个线程池通常可提高程序性能。
调用execute等方法将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的(均在工作中),则创建一个新线程并添加到池中。将会终止并移除那些已有60秒钟未工作的线程。因此,长时间保持空闲的线程池不会使用任何资源。线程池会根据执行的情况,在程序运行时自动调整线程数量,这里就是可变线程数量的线程池的特点。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即没有核心线程,线程数量最大为Integer. MAX_VALUE;keepAliveTime = 60s,对所有的线程空闲60s后清理。
workQueue 为 SynchronousQueue 阻塞同步队列,该队列没有容量,因此如果有新的任务进来并且目前的线程都在工作中,那么会立即创建新线程执行任务;
适用于快速处理大量耗时较短的任务,如果任务耗时较长,极端情况下CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。内部就是一个ScheduledThreadPoolExecutor实例!
public static ScheduledExecutorService
newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newSignalThreadExecutor
public static ExecutorService newSingleThreadExecutor()
返回只有固定一个线程的线程池!
这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢?它和newFixedThreadPool“固定容量”的线程池有什么区别呢?是否可以轻易的改变线程数量呢?写个demo来测试一下:
/**
* @author lx
*/
public class FixedThreadPoolTest {
public static void main(String[] args) {
//固定容量的线程池,一个线程
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
//强转
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
System.out.println(threadPoolExecutor.getCorePoolSize());
//重设核心线程
threadPoolExecutor.setCorePoolSize(8);
System.out.println(threadPoolExecutor.getCorePoolSize());
//可以看到,上面的所谓"固定容量"的线程池,虽然只有一个容量,但是可以轻易修改容量只需要强制转型;
//下面的单个线程的线程池,由于外层包装了FinalizableDelegatedExecutorService
//它是Executors的内部类,不能强转为ThreadPoolExecutor,因此不能改变大小1,做到真正的Single.
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
//下面报运行时异常 java.lang.ClassCastException
ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}
}
为什么不能强转?原理很简单,FinalizableDelegatedExecutorService并没有和ThreadPoolExecutor产生继承关系。
/**
* FinalizableDelegatedExecutorService作为DelegatedExecutorService的子类
*/
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
//......
}
/**
* DelegatedExecutorService作为AbstractExecutorService的子类
* 可以看到这两个类和ThreadPoolExecutor没有关系,内部也并不是ThreadPoolExecutor实现的
* 而setCorePoolSize的方法只有ThreadPoolExecutor类存在,因此如果强转会抛出ClassCastException
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) {
e = executor;
}
//......
}
默认线程工厂
public static ThreadFactory defaultThreadFactory()
Executors提供了一个默认的线程工厂实现!线程池中线程的默认名字的由来以及线程所属线程组等属性都是通过线程工厂设置的!
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/**
* DefaultThreadFactory 是默认的线程工厂,使用Executors.defaultThreadFactory()获取
*/
static class DefaultThreadFactory implements ThreadFactory {
//静态的原子变量, 用来统计线程工厂的个数。是static 的原子变量,用来记录当前线程池的编号,它是应用级别的,所有线程池共用一个,比如创建第一个线程池时线程池编号为l ,创建第二个线程池时线程池的编号为2,所以pool-1-thread-1 里面的pool -1 中的l 就是这个值。
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程组
private final ThreadGroup group;
//threadNumber 用来记录每个线程工厂创建了多少线程, 这两个值也作为线程池和线程的名称的一部分。线程池级别的,每个线程池使用该变量来记录该线程池中线程的编号, 所以pool- 1-thread- l 里面的thread-I 中的l 就是这个值。
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名字的前缀,线程池中线程名称的前缀,默认固定为pool o
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
/**
* newThread 方法是对线程的一个修饰
* @param r
* @return
*/
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
//线程名字由来
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
Runnable转换为Callable
public static < T > Callable< T > callable(Runnable task, T result)
返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。这在把需要 Callable 的方法应用到其他无结果的操作时很有用。
public static Callable< Object > callable(Runnable task)
返回 Callable 对象,调用它时可运行给定的任务并返回 null。
原理很简单,就是适配器模式的应用,返回的是一个RunnableAdapter适配器类的实例。
/**
* 返回 Callable 对象,调用它时可运行给定的任务并返回 null。
*
* @param task 被适配的Runnable任务
* @return 一个Callable任务
* @throws NullPointerException 如果task为inull
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
/**
* 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。
* 这在把需要 Callable 的方法应用到其他无结果的操作时很有用。
*
* @param task 被适配的Runnable任务
* @param result 指定返回值
* @return 一个Callable任务
* @throws NullPointerException 如果task为inull
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
/**
* 该类是Executors的内部类,实现了callable接口
* 作为一个适配器类,用于将runnable任务和result结果适配成Callable对象
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
/**
* 重写了call方法
* 适配原理很简单,调用task.run(),并且返回result
*
* @return result
*/
public T call() {
task.run();
return result;
}
}
总结
本次我们学习了Executor执行框架的大概架构,以及ThreadPoolExecutor、FutureTask、ScheduledThreadPoolExecutor 、Executors等核心类的原理。阅读线程池的实现源码可以让我们不仅知道某个方法是什么,还能知道为什么,以及怎么实现的,在这过程中我们还能接触到比如适配器模式,策略模式、Leader-Follower线程模型等一些偏理论的东西的具体应用。
如果你读完本文,你应该可以知道一些关于线程池的不常见的知识,比如:
线程池的最大线程数量一定会小于等于maximumPoolSize吗?为什么?
核心线程可以应用keepAliveTime设置的超时时间吗?有例外吗?
线程池有用到锁吗?有几种锁?有什么用?
如果一个工作线程在执行任务过程中抛出了异常,那么这个线程会怎样呢?
延迟/周期任务的原理是什么?scheduleWithFixedDelay和scheduleWithFixedDelay 方法的区别是什么?
正常线程池中设置的延迟任务一定会在到达你设置的延迟时间之时运行吗?
FutureTask的原理?
Executor执行框架类容很丰富,功能很多,本次仅仅讲解了一部分,还有一些包括ThreadPoolExecutor、ScheduledThreadPoolExecutor的某些方法也没有讲解,使用时建议查看相关api文档做更全面的了解!另外JDK1.7的时候线程池新增了ForkJoinPool分治框架,这是对线程池的增强,后面的文章我们会讲解ForkJoinPool的源码!