Java线程池源码分析

后端 潘老师 5个月前 (11-20) 107 ℃ (0) 扫码查看

本文主要讲解关于Java线程池源码分析相关内容,让我们来一起学习下吧!

excute()方法源码分析

execute方法内部调用 addWorker() 来新增任务,并且同时调用 Thread().start() 来启动新线程中的任务

ctl 变量是一个 int 类型的变量,其高 3 位表示线程池的状态(RunState),低 29 位表示工作线程数量。

线程池的状态有以下几种:

  • RUNNING:线程池处于运行状态,可以接收新的任务。
  • SHUTDOWN:线程池不再接收新的任务,但会继续执行已提交的任务。
  • STOP:线程池不再接收新的任务,也不会继续执行已提交的任务,并中断所有正在执行的任务。
  • TIDYING:线程池正在被终止,所有工作线程都已终止。
  • TERMINATED:线程池已终止。

注意:

  1. WorkQueue 表示同步阻塞队列
  2. workers 表示池里的工作线程,工作线程又可以分为核心线程和非核心线程
    public void execute(Runnable command) {
        // ...
            int c = this.ctl.get();
            // 判断工作线程数,核心线程数
            // 这里是添加工作线程
            if (workerCountOf(c) < this.corePoolSize) {
                // 创建新的 Worker
                if (this.addWorker(command, true)) {
                    return;
                }
                c = this.ctl.get();
            }
            // 新的任务加到同步队列尾部
            // 线程池处于运行状态,同步阻塞队列中则添加新的任务
            if (isRunning(c) && this.workQueue.offer(command)) {
                int recheck = this.ctl.get();
                // 再次检查是否处于运行状态,不处于了就从队列中移除刚添加的
                if (!isRunning(recheck) && this.remove(command)) {
                    // 这里应该是走拒绝策略了
                    this.reject(command);
                } else if (workerCountOf(recheck) == 0) {
                    // 如果工作线程为 0 了,这里添加一个非核心线程,去执行等待队列中的任务
                    this.addWorker((Runnable)null, false);
                }
            } 
            // 如果之前添加到队尾失败了,就添加非核心线程,也失败了就走拒绝策略
            else if (!this.addWorker(command, false)) {
                this.reject(command);
            }

    }

addWorker()方法源码分析

里面看一个之前没用过的 retry 语句,它可以用来跳出任意层数的循环

说下流程,里面先会对当前运行的线程数进行判断,用到了 CAS 加循环判断,当满足条件,就通过 retry 跳出循环,走下面的任务创建逻辑

private boolean addWorker(Runnable firstTask, boolean core) {
    // 什么写法?
    // 双层 for 循环, break 跳出当前 for , retry 直接跳出两层 for 循环

    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        // 判断线程池的状态,如果是 SHUTDOWN,STOP,xx 就不添加了
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            // 如果当前线程运行的数量大于核心线程数(新任务是核心线程),或者最大线程数,就返回 false
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 满足条件,就跳出这两层 for 循环
            // workerCount +1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建一个 Worker
        w = new Worker(firstTask);
        // 拿到 Worker 对应的新线程
        final Thread t = w.thread;
        if (t != null) {
            // worker 的入队加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 判断线程池的状态
                if (isRunning(c) ||
                    // 小于 STOP 状态
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    //添加到
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动线程去执行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

getTask()方法源码分析

先回忆一下BlockingQueue 的 poll 和 take 方法都是用于从队列中获取元素的。它们的区别在于:

poll() 方法不会阻塞,如果队列为空~~,~~则返回 null。
take() 方法会阻塞,直到队列中有元素可用。

因此,poll() 方法适用于不需要阻塞的场景,例如,从队列中获取元素用于判断是否有新任务。take() 方法适用于需要阻塞的场景,例如,从队列中获取元素用于执行任务。

private Runnable getTask() {
    
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        // 如果线程池状态已经 shutdown.stop , 或者等待队列中已经空了 
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            // 减少工作线程的数量,直接回到 runWorker() 中执行
            decrementWorkerCount();
            return null;
        }
            
        // 走到这里说明线程池是运行状态的
        int wc = workerCountOf(c);
        
        // Are workers subject to culling?
        // 默认 allowCoreThreadTimeOut  是 false 
        // 看 wc 数量是否大于核心线程数,大于则是非核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 如果已经大于最大线程数,或者超时
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根据 timed 来取,ture 取非阻塞式的,false 阻塞式的
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

Worker源码分析

线程池内部类,继承自 AQS,我们知道 ReentrantLock 也是继承于 AQS,大概也是为了同步

Worker(Runnable firstTask) {
    // Worker 内部对应着一个线程
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 初始化时创建一个 Thread ,
    this.thread = getThreadFactory().newThread(this);
}

 public void run() {        
     runWorker(this);
 }

总结

往Java线程池添加新任务,在会先检查核心线程和工作线程,分别在工作线程和同步队列中添加。

思考

Java线程池中核心线程和非核心线程有哪些区别,为什么这么设计?

从默认实现看,当线程数小于 corePoolSize 时,为核心线程,大于时创建的为非核心线程。

如果 allowCoreThreadTimeOut == true 时,核心线程和非核心线程都一样的处理。

这样设计,可能是处于考虑既需要保存一定数量的线程存活,准备随时处理任务,又不能持有过多的线程数,让线程池的容量有一定的伸缩性。

以上就是关于Java线程池源码分析相关的全部内容,希望对你有帮助。欢迎持续关注潘子夜个人博客,学习愉快哦!


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/11409.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】