线程池


关键字:

Executor顶层接口 -> ExecutorService -> AbstractExecutorService 抽象类

RunnableFuture顶层任务类 -> 任务包装类:FutureTask

代表了一个一次性的计算过程,内部 有一个volatile的字段state,来表示任务的状态

ThreadPoolExecutor:最常用的线程池,有七个参数:最小核心数、最大核心数、保活时间、保活时间单位,线程工厂,任务队列、丢弃策略

Worker:ThreadPoolExecutor 中的线程包装类,继承了 Runnable 接口 ,用来封装线程池中的工作线程

线程状态

线程的状态转换图

线程池状态

线程池的状态转换图

线程池原理

线程添加、运行原理

ForkJoin 线程池

组件的内容简单讲解

ExecutorService:顶层抽象类

只有一个 execute 的方法,在具体的实现类中实现

ExecutorService:接口类

定义了线程提交(submit)、批量提交(invokeAll)、关闭(shutdown)的方法定义

AbstractExecutorService:线程池的抽象类

模板方法,定义了线程池ExecutorService的基本操作流程,将具体的 execute 方法的实现,放到具体的子类中实现

使用newTaskFor方法,对Runnable、Callable进行了包装,包装为一个RunnableFuture

�的实现类:FutureTask对象

FutureTask

内部有一个Callable的对象,由RunnableAdapter包装,用于封装返回值

代表了一个一次性的计算过程,来表示任务的状态

  • volatile int state:状态字段,表示当前任务的状态
  • volatile WaitNode waiters:用来存储那些正在等待任务结果的线程的,链表结构
  • volatile outcome:来存储任务执行的结果或抛出的异常。当任务完成时,结果会存储在这个字段中,供后续调用get()方法时返回。
  • Callable callable:这是任务的实际计算逻辑,FutureTask在构造时接收一个Callable对象,并在后台线程中执行它的call()方法。
  • volatile Thread runner:这个字段用来记录哪个线程正在执行任务。这是为了确保任务只被执行一次,并且在取消任务时可能需要中断这个线程。
  • stateOffset、runnerOffset、waitersOffset:几个使用Unsafe机制可以直接进行原子操作的偏移量,直接使用硬件级别的原子操作来控制并发,这样可以在某些场景下提供更好的性能。但是,这种方式写出的代码可读性较差,且容易出错,因此在日常开发中并不推荐使用。

AbstractExecutorService 的实现类:四个

Executors:工具类 ,对下面几个实现的封装

ForkJoinPool:合并拆分的线程池

ScheduledThreadPoolExecutor:定时调度的线程池

ThreadPoolExecutor:最常用的线程池

添加了动态修改最大核心数、最小核心数、保活时间、线程工厂、拒绝策略等参数的能力

实现了 shutdown()、shutdowmNow()、execute/submit 等抽象类中的方法

提供了扩展方法:beforeExecute、afterExecute、terminated 等方法,可以在任务运行前后,做一些扩展

核心方法:runWorker(Worker)

Worker:ThreadPoolExecutor 的内部类,实现了 Runnable

继承了 AQS,用于对线程的状态进行加锁,用来判断可以中断

构造函数为 firstTask,在构造函数里,会调用线程工厂,去 new 一个线程 Thread,runnable 对象为自己

重点:ThreadPoolExecutor

构造参数:7个

核心线程数、最大线程数

睡眠时间、睡眠时间单位

线程工厂、任务队列、拒绝策略

线程池的构造方法:

1、直接创建ThreadPoolExecutor

2、通过Excutor的工具类Executors来创建(阿里巴巴开发规范不允许使用这种方式)

原理

组成部分:

ctl:一个 AtomicInteger 变量,高位代表线程池的状态,低位代表当前活跃的线程数

�mainLock:用来控制线程状态的锁

HashSet workers :

Worker:封装了的执行线程

submit操作:

newTaskFor

使用FutureTask包装任务

execute:交由具体的子类实现

每个 Worker 自己的锁

线程池的全局锁

mainLock,是个ReentrantLock(可重入锁),是个

中断

可能发生中断的情况

1 、调用了 shutdownNow()

设置线程池的状态为 stop,停止添加新的任务,对现有的所有线程,设置其中断位,由各个线程自己去检查处理

2、任务取消

对于提交的 Future 类型的任务,可以通过其 cancel 方法取消

3、线程被中断,在应用中手动调用 interrupt(),导致线程被中断

线程被中断后,应该及时检查终端标志位,及时清理资源,并退出

还应该注册关闭钩子:Runtime.getRuntime().addShutdownHook(Thread hook),在JVM退出时优雅地关闭线程池

在里面调用 shuwdownNow()、shutdown(),及时关闭线程,并根据需要,等待线程执行完毕

重点流程

execute:添加线程的主要方法

判断当前线程池的大小,若小于 coreSize,则尝试通过 addWorker 添加线程

否则,若向阻塞队列中添加任务失败

则也通过 addWorker 创建线程

若创建向队列添加成功

则再次检线程数,若线程数为 0,则 调用 addWorker,则补充 worker(firstTask 为 null)

添加 worker 失败,则调用拒绝策略进行处理

addWorker(提交任务时候用)

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

流程概述:

1、判断线程数是否大于 coreSize、MxSize

2、生成 Worker,插入 workers,启动线程

ThreadPoolExecutor中添加worker的流程

获取ctl状态线程池是否正在停止且不满足添加条件?yesno返回false检查工作线程数量限制工作线程计数超过CAPACITY或池大小?yesno返回false尝试原子增加工作线程计数是否成功?yesno重新读取ctl线程池状态是否改变?yesno继续外层循环继续内层循环创建新的Worker对象尝试获取主锁再次检查线程池状态满足条件不满足条件检查线程是否已启动线程已启动?yesno抛出IllegalThreadStateException将Worker添加到workers集合更新最大线程池大小标记worker已添加释放主锁并退出释放主锁启动工作线程工作线程是否成功启动?yesno返回true处理启动失败

runWorker(Worker中调用,运行线程)

run 方法中,是调用 addWorker(this),开启循环 loop:

while 循环捞取任务

加锁锁,设置不允许被其他线程中断

当捞取的任务为 null 时,表示当前线程可以被销毁了,则执行processWorkerExit 方法,退出线程

当捞取的任务不为 bull

若线程池已经被中断,则也中断当前的线程

执行前置处理函数beforeExecute

执行目标任务的 run 方法

执行后置处理函数afterExecute

释放锁,设置可以被其他线程中断

最佳实践

线程数设置

N+1(CPU密集型)

1 是指为了避免CPU缺页中断,导致的CPU空闲,所以多了一个线程,去执行

2N(IO密集型)

和ThreadLocal一起使用的坑

由于线程池会共享线程,因此,从ThreadLocal中获取到数据,可能是上一个使用线程的场景留下的数据,有坑,需要及时remove掉

父子线程无法继承ThreadLocal中的值,可以使用InheritableThreadLocal,但也只能在首次设置的时候,才能给子线程设值

参考文档:

1、硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/

2、虚拟线程

https://javaguide.cn/java/concurrent/virtual-thread.html


文章作者: 王利康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 王利康 !
  目录