关键字:
Executor顶层接口 -> ExecutorService -> AbstractExecutorService 抽象类
RunnableFuture顶层任务类 -> 任务包装类:FutureTask
代表了一个一次性的计算过程,内部 有一个volatile的字段state,来表示任务的状态
ThreadPoolExecutor:最常用的线程池,有七个参数:最小核心数、最大核心数、保活时间、保活时间单位,线程工厂,任务队列、丢弃策略
Worker:ThreadPoolExecutor 中的线程包装类,继承了 Runnable 接口 ,用来封装线程池中的工作线程
线程状态
线程的状态转换图
线程池状态
线程池的状态转换图
线程池原理
线程添加、运行原理
ForkJoin 线程池
组件的内容简单讲解
ExecutorService:顶层抽象类
只有一个 execute 的方法,在具体的实现类中实现
ExecutorService:接口类
定义了线程提交(submit)、批量提交(invokeAll)、关闭(shutdown)的方法定义
AbstractExecutorService:线程池的抽象类
模板方法,定义了线程池ExecutorService的基本操作流程,将具体的 execute 方法的实现,放到具体的子类中实现
使用newTaskFor方法,对Runnable、Callable进行了包装,包装为一个RunnableFuture
�的实现类:FutureTask对象
FutureTask
内部有一个Callable的对象,由RunnableAdapter包装,用于封装返回值
代表了一个一次性的计算过程,来表示任务的状态
- volatile int state:状态字段,表示当前任务的状态
- volatile WaitNode waiters:用来存储那些正在等待任务结果的线程的,链表结构
- volatile outcome:来存储任务执行的结果或抛出的异常。当任务完成时,结果会存储在这个字段中,供后续调用get()方法时返回。
- Callable
callable:这是任务的实际计算逻辑,FutureTask在构造时接收一个Callable对象,并在后台线程中执行它的call()方法。 - volatile Thread runner:这个字段用来记录哪个线程正在执行任务。这是为了确保任务只被执行一次,并且在取消任务时可能需要中断这个线程。
- stateOffset、runnerOffset、waitersOffset:几个使用Unsafe机制可以直接进行原子操作的偏移量,直接使用硬件级别的原子操作来控制并发,这样可以在某些场景下提供更好的性能。但是,这种方式写出的代码可读性较差,且容易出错,因此在日常开发中并不推荐使用。
AbstractExecutorService 的实现类:四个
Executors:工具类 ,对下面几个实现的封装
ForkJoinPool:合并拆分的线程池
ScheduledThreadPoolExecutor:定时调度的线程池
ThreadPoolExecutor:最常用的线程池
添加了动态修改最大核心数、最小核心数、保活时间、线程工厂、拒绝策略等参数的能力
实现了 shutdown()、shutdowmNow()、execute/submit 等抽象类中的方法
提供了扩展方法:beforeExecute、afterExecute、terminated 等方法,可以在任务运行前后,做一些扩展
核心方法:runWorker(Worker)
�
Worker:ThreadPoolExecutor 的内部类,实现了 Runnable
继承了 AQS,用于对线程的状态进行加锁,用来判断可以中断
构造函数为 firstTask,在构造函数里,会调用线程工厂,去 new 一个线程 Thread,runnable 对象为自己
重点:ThreadPoolExecutor
构造参数:7个
核心线程数、最大线程数
睡眠时间、睡眠时间单位
线程工厂、任务队列、拒绝策略
线程池的构造方法:
1、直接创建ThreadPoolExecutor
2、通过Excutor的工具类Executors来创建(阿里巴巴开发规范不允许使用这种方式)
原理
组成部分:
ctl:一个 AtomicInteger 变量,高位代表线程池的状态,低位代表当前活跃的线程数
�mainLock:用来控制线程状态的锁
HashSet
Worker:封装了的执行线程
submit操作:
newTaskFor
使用FutureTask包装任务
execute:交由具体的子类实现
锁
每个 Worker 自己的锁
线程池的全局锁
mainLock,是个ReentrantLock(可重入锁),是个
中断
可能发生中断的情况
1 、调用了 shutdownNow()
设置线程池的状态为 stop,停止添加新的任务,对现有的所有线程,设置其中断位,由各个线程自己去检查处理
2、任务取消
对于提交的 Future 类型的任务,可以通过其 cancel 方法取消
3、线程被中断,在应用中手动调用 interrupt(),导致线程被中断
线程被中断后,应该及时检查终端标志位,及时清理资源,并退出
还应该注册关闭钩子:Runtime.getRuntime().addShutdownHook(Thread hook),在JVM退出时优雅地关闭线程池
在里面调用 shuwdownNow()、shutdown(),及时关闭线程,并根据需要,等待线程执行完毕
重点流程
execute:添加线程的主要方法
判断当前线程池的大小,若小于 coreSize,则尝试通过 addWorker 添加线程
否则,若向阻塞队列中添加任务失败
则也通过 addWorker 创建线程
若创建向队列添加成功
则再次检线程数,若线程数为 0,则 调用 addWorker,则补充 worker(firstTask 为 null)
添加 worker 失败,则调用拒绝策略进行处理
addWorker(提交任务时候用)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
流程概述:
1、判断线程数是否大于 coreSize、MxSize
2、生成 Worker,插入 workers,启动线程
ThreadPoolExecutor中添加worker的流程
runWorker(Worker中调用,运行线程)
run 方法中,是调用 addWorker(this),开启循环 loop:
while 循环捞取任务
加锁锁,设置不允许被其他线程中断
当捞取的任务为 null 时,表示当前线程可以被销毁了,则执行processWorkerExit 方法,退出线程
当捞取的任务不为 bull
若线程池已经被中断,则也中断当前的线程
执行前置处理函数beforeExecute
执行目标任务的 run 方法
执行后置处理函数afterExecute
释放锁,设置可以被其他线程中断
最佳实践
线程数设置
N+1(CPU密集型)
1 是指为了避免CPU缺页中断,导致的CPU空闲,所以多了一个线程,去执行
2N(IO密集型)
和ThreadLocal一起使用的坑
由于线程池会共享线程,因此,从ThreadLocal中获取到数据,可能是上一个使用线程的场景留下的数据,有坑,需要及时remove掉
父子线程无法继承ThreadLocal中的值,可以使用InheritableThreadLocal,但也只能在首次设置的时候,才能给子线程设值
参考文档:
1、硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/
2、虚拟线程