ThreadPoolExecutor源码分析1-运行原理

引言

经常看到 JDK 原生线程池实现原理的分享,但还是觉得自己亲自去阅读哈源码来得真切。当然,结合前人的梳理总结去读是必须滴,这里先对他们表示感谢。对我来说,带着问题去读会驱动自己深入思考,理解得也更深入一些。这次我想弄明白以下这些问题:

  1. 线程池如何控制池中的线程数量?何时创建新线程?何时终止已有线程?
  2. 如何在多个线程上高效地调度任务?对于来不及处理的任务如何管理?
  3. 如何通过线程池技术来控制系统的最大并发数和最大处理任务量,从而很好的实现流控,保证系统不崩溃?

ThreadPoolExecutor 概观

粗略地看,Executor 有三部分结合在一起构成:

  • 线程池。用来复用工作线程,避免频繁地创建和销毁线程,一个工作线程的生命周期内可以运行多个任务;工作线程数的动态调整
  • 待运行任务的存储。不同的存储实现又会反过来影响其他两部分
  • 任务调度算法

配置参数

ThreadPoolExecutor 类中字段比较多,但只有一部分是用户可以通过构造方法的参数来控制的。

有下面这些字段用户可以控制:

  1. corePoolSize:就是存活着(keep alive)的工作者的最小数量。(这里先暂时把工作者看作工作线程。)不仅要保持这些工作者是活着的,还不允许它们超时,除非 allowCoreThreadTimeOut 被设置为 true,这种情况下最小数量就是 0。

  2. maximumPoolSize:最大池大小,也就是工作者的最大数量。注意,实际的最大数量在内部还被 CAPACITY 常量限制。

  3. keepAliveTime:空闲线程等待工作的超时时间。当线程数多于 corePoolSize 时或若 allowCoreThreadTimeOut 为真,就会使用这个超时时间。否则,他们会永远等待新的工作。

  4. allowCoreThreadTimeOut:若为 false(默认值),核心线程(core threads)会一直存活,即使它们空闲着。若为 true,核心线程就使用 keepAliveTime 作为超时时间等待新工作。核心线程(core threads)指线程数不大于 corePoolSize 时仍存活着的线程。

  5. workQueue 用来保存任务并将其交给工作线程的阻塞队列。阻塞队列的具体类型可以自己选择,有这么几种:

    • ArrayBlockingQueue:数组支持的(array-backed)有界(bounded)阻塞队列,FIFO。
    • LinkedBlockingDeque:链表支持的可选有界(optionally-bounded)阻塞队列,FIFO。
    • SynchronousQueue:不存储元素的无界阻塞队列。每一个插入操作必须等待一个对应的由另一个线程执行的移除操作,反之亦然。也就是说,每个插入操作必须等到另一个线程调用移除操作,否则插入操作会一直阻塞。它的吞吐量要高于 LinkedBlockingQueue。
    • PriorityBlockingQueue:基于平衡二叉堆(balanced binary heap)的无界优先级队列。
  6. threadFactory:所有线程都使用这个工厂创建。

  7. RejectedExecutionHandler handler:线程池和任务队列都已满时对新来的任务所采取的策略。

    • AbortPolicy:对于被拒绝的任务,直接抛出 RejectedExecutionException
    • CallerRunsPolicy:在调用 execute 方法的线程中直接运行被拒绝的任务,除非当前 executor 被关闭(shut down),在这种情况下,任务会被丢弃。
    • DiscardPolicy:默默地丢弃掉被拒绝的任务
    • DiscardOldestPolicy:丢弃最老的未处理请求,然后重新调用 execute 方法,除非当前 executor 被关闭(shut down),在这种情况下,任务会被丢弃。
    • 当然你也可以自定义策略。

基本工作原理

在了解了上面的配置参数后,我就慢慢潜入到了实现代码中,几番阅读思考后,发现自己陷入了实现细节中,失去了对整体设计思路的把握和基本工作原理的理解。赶紧调整了姿势,先去阅读了 ThreadPoolExecutor 的 Java Docs,下面是官方文档的概要:

线程池解决了两个问题:对于大量异步任务的执行,它通常提供了更好的性能,因为减少了每个任务调用的开销(译注:任务调用的开销与任务运行的开销不同,前者是指安排任务开始运行所需的开销,而后者是指任务运行期间的开销,这是由任务本身的逻辑和具体实现决定的);并且它提供了一种方法来限制和管理运行一批任务时所消耗的资源。此外,它也维护一些基础的统计信息,如已完成的任务数。

core and maximum pool size

ThreadPoolExecutor 会根据 corePoolSize 和 maximumPoolSize 设置的界限自动调整线程池的大小。当在 execute(Runnable) 方法中提交了新任务,且正在运行的线程数量少于 corePoolSize 的时候,就创建新线程来处理刚提交的请求(新任务直接交给线程运行,不会进入队列),即使其他工作线程是空闲的。如果有多于 corePoolSize 且少于 maximumPoolSize 个线程在运行,仅当队列已满时才会创建新线程。通过将 corePoolSize 和 maximumPoolSize 设置为同样的值,就可以创建固定大小(fixed-size)的线程池。通过将 maximumPoolSize 设置为一个本质上无界限的值如 Integer.MAX_VALUE,来允许线程池容纳任意数量的并发任务。

按需创建

默认情况下,连 core threads 也仅在新任务到达时才被创建和启动,但这可以使用 prestartCoreThread() 或 prestartAllCoreThreads() 方法来动态地覆盖。如果你用非空队列来构造线程池,你很可能想要预先启动线程。

keep-alive time

如果当前线程池中的线程数多于 corePoolSize,那么当那些过量的线程(excess threads)空闲的时间超过 keepAliveTime 时它们就会被终止。当线程池未被活跃地使用时,这提供了一个降低资源消耗的手段。如果后来线程池变得更加活跃了,新线程将会被创建。将此参数设置为 Long.MAX_VALUE TimeUnit.NANOSECONDS 实际上禁止空闲线程在 Executor 被关闭(shut down)之前被终止。默认情况下,keep-alive 策略仅当线程数多于 corePoolSize 时才应用。但是 allowCoreThreadTimeOut(boolean) 方法可以将这个超时策略也应用于核心线程(core threads),只要 keepAliveTime 值非零。

任务排队

任何 BlockingQueue 都可用来传递和保存提交的任务。这个队列的使用和线程池大小的设定(pool sizing)相互作用:

  1. 若正在运行的线程数少于 corePoolSize,Executor 总是优先添加新线程,而非让任务排队
  2. 若 corePoolSize 或更多个线程正在运行,Executor 总是优先让请求进入队列排队,而非添加新线程
  3. 如果请求无法入队(如队列已满),就创建新线程。除非这将导致工作线程总数超过 maximumPoolSize,在这种情况下,任务就会被拒绝。

有三个通用的排队策略:

  1. 直接手递手传递任务。此时,工作队列的一个不错的默认选择就是 SynchronousQueue,它不保存任务,而是直接将任务交给线程运行。
  2. 使用无界队列(Unbounded queue)传递任务。如果使用无界队列(例如有预定义容量的 LinkedBlockingQueue),那么当所有 corePoolSize 个线程都在忙时新任务就在队列中等待。如此,任何时候都不会有多于 corePoolSize 个线程被创建。也就是说,maximumPoolSize 的值不起作用了。
  3. 使用有界队列(bounded queue)。将有界队列(例如 ArrayBlockingQueue)和有限的(finite) maximumPoolSizes 值一起使用可防止资源耗尽,但可能会更加难于调整和控制(tune and control),因为队列大小和线程池的最大大小之间要互相协调/平衡。

被拒绝的任务

当 Executor 被关闭(shut down),或者对最大线程数和工作队列容量都指定了有限的(finite)界限且线程池和工作队列都已饱和的时候,在 execute(Runnable) 方法中提交的新任务将会被拒绝。在每种情况下,execute 方法都会调用 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法来处理被拒绝的任务。

参考资源

几种线程池的实现算法分析
ThreadPoolExecutor Java Docs

0%