ThreadPoolExecutor源码分析3-深度历险

重要抽象

Worker

首先,Worker 可被看作工作线程,因为它包含了工作线程中最重要的 run-loop。该类主要为运行任务的线程维护中断控制状态(interrupt control state),以及簿记工作(如工作线程完成的任务数)。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{

// ... 省略部分代码

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** 直接委托给 ThreadPoolExecutor.runWorker(Worker) 方法 */
public void run() {
runWorker(this);
}

// Lock methods
// 值 -1 表示阻止被中断,直到 runWorker 方法被调用
// 值 0 代表已解锁状态(unlocked state)
// 值 1 代表已锁定状态(locked state)

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
// 状态为 -1 就不能中断当前工作线程
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

Worker 还讨巧地通过继承 AbstractQueuedSynchronizer 来简化获取和释放包围运行任务的代码块的锁。换句话说,一个 Worker 实例就是一个锁,要锁住的目标就是执行任务的代码块。这样就能防止 意欲唤醒正在等待任务的线程的中断 反而中断了正在运行的任务。注意,这个锁是不可重入的。此外,为了阻止中断直到线程真正开始运行任务,创建 Worker 对象时,将锁状态初始化为 -1,开始执行任务时再调用 Worker.unlock() 方法清除锁状态,请看 ThreadPoolExecutor.runWorker(Worker) 方法。因为最重要的 run-loop 在 runWorker 方法中,所以执行到 runWorker 方法时才算是真正开始运行任务。

FutureTask

FutureTask 表示可撤消的(cancellable)异步任务。它对 Callable 或 Runnable 进行包装,如果是 Runnable,在内部先将其转换为 Callable。类图如下:

Future 就是异步计算的结果。使用 Future 可以撤消(cancel)任务的执行,查看任务是否在正常完成前被撤消,查看任务是否完成(正常完成、发生异常或被撤消都算已完成),获取结果(如有必要会阻塞)。

Callable 跟 Runnable 类似,它们都表示可能由另一个线程执行的任务。不同之处在于 Runnable 不会返回任务执行的结果,也不能抛出受查异常。

核心逻辑

概览

下面是一个省略掉了很多细节的时序图,希望有个大概的总体印象:

任务的提交和 worker 的创建

尝试用时序图来描绘核心执行流程时发现要么过于简单而无法展现重要逻辑,要么过于复杂导致很耗时间且图看得人眼花。索性使用文字结合代码来逐步分析。

首先是 submit 方法,做了什么事儿很清晰:

1
2
3
4
5
6
7
8

// 用户提供的 result 可看作 用来保存任务运行结果的容器对象
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

execute 方法包含执行任务的主要步骤。有三步:

  1. 如果 worker 数小于 corePoolSize,就添加 worker,并将提交的任务作为其 firstTask。

  2. 如果 worker 数大于等于 corePoolSize,且池状态为 running,就将提交的任务放入任务队列。成功入队后仍然需要再次检查池状态,因为在这短暂的间隙内可能线程池已被关闭,或者状态虽为 running 但 worker 数已变成 0,这两种情况都需要相应的处理。

  3. 如果任务队列已满,且 worker 数小于 maxPoolSize,就添加 worker。否则就拒绝任务。

remove 方法从内部的任务队列中移除给定的任务。每次从任务队列移除任务后都要调用 tryTerminate 方法尝试终止线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// Step 1
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// Step 2
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查状态,若已关闭,就回滚入队操作,并拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 以防虽然状态为running,但池中没有worker了,没人处理刚入队的任务了
addWorker(null, false);
}
// Step 3
else if (!addWorker(command, false))
reject(command);
}

public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

接着来看 addWorker 方法。参数 core 指示将 corePoolSize 还是 maxPoolSize 作为上限来判断是否应当添加 worker。这里重点梳理下该方法的返回值。返回 true 表示 worker 添加成功(包括线程的创建和启动);返回 false 表示要么不应当添加 worker,要么线程创建失败。如果线程创建失败,会执行回滚操作。哪些情况下该方法返回 false:

  • 线程池已停止,即 (rs > SHUTDOWN)
  • 线程池已关闭,但 firstTask 不为 null,即 (rs == SHUTDOWN && firstTask != null)。因为此时线程池不再接收新任务了
  • 线程池已关闭,firstTask 为 null,任务队列也为空,即 (rs == SHUTDOWN && firstTask == null && workQueue.isEmpty())。此时已没有任何任务需要执行了
  • worker 数已达上限
  • 线程创建失败。ThreadFactory 返回 null 或者发生异常(通常都是 Thread.start() 抛出 OutOfMemoryError)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果线程池已停止
// 或者 已关闭且提交的任务即 firstTask 不为 null
// 或者 已关闭且firstTask 为 null 且任务队列为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

// 内层循环
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 再次检查是否应当添加核心/非核心线程,因为自 execute 方法中的
// 检查之后池的其他用户也可能提交了任务进而添加了新 worker
wc >= (core ? corePoolSize : maximumPoolSize))
return false;

// 这里使用基于 CAS 的乐观并发策略实现非阻塞同步,CAS 是先检测是否有冲突,若无再操作。
// 若有冲突,调用方需要进行补偿,所以需要把这句代码放入循环中。
// 如果将 workerCount 加 1 成功,就中止外层循环。
if (compareAndIncrementWorkerCount(c))
break retry;

// 再次检查状态,如果跟上次检查结果不同,就直接进入下一次外循环
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// 这里省略掉的 else 分支对应的情形是
// CAS 操作失败(因为 workerCount 改变了)时就重试内循环
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

// 条件 rs < SHUTDOWN 意思是状态是否为 RUNNING
// 条件 (rs == SHUTDOWN && firstTask == null) 有点不太好理解,实际上这里省略了
// '&& workQueue.isNotEmpty()'。此方法一开始就测试了包含省略部分的完整条件,
// 走到这里,如果(rs == SHUTDOWN && firstTask == null)为true,那么 workQueue 肯定不为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程创建失败或启动失败,或发生了异常
if (! workerStarted)
// 执行必需的回滚逻辑
addWorkerFailed(w);
}
return workerStarted;
}

worker 如何执行任务

ThreadPoolExecutor.runWorker 方法才是实际的线程体方法。通过 run-loop,不断地从任务队列获取任务,执行它们。在每次循环中,妥善地处理下面几件事儿:

  1. 若给定的 worker 带有初始任务,就将它作为第一次循环要执行的任务。否则,就从队列获取任务。如果 getTask 方法返回 null,就表示当前工作线程必须退出(exit),然后 run-loop 就会中止,当前线程就退出了。在退出前,会为临终的 worker 执行清理和簿记工作,就是 processWorkerExit 方法。如果是因为 runWorker 方法内发生了异常导致退出,那么标示 worker 异常结束的布尔变量 completedAbruptly 就为真,processWorkerExit 内就会使用新 worker 替代当前 worker。
  2. 在运行任何任务之前,先获取锁以防止正在执行任务时发生其他池中断(pool interrupts),然后我们确保除非池正在停止,否则此线程的中断状态不会被设置。
  3. 在执行任务之前先调用钩子方法 beforeExecute,它可能会抛出异常,在这种情况下会导致当前工作线程还未运行任务就要死亡(中止循环且 completedAbruptly 为 true)了。
  4. 假设 beforeExecute 方法正常完成,就运行任务,收集任何抛出的异常,并将其发送给钩子方法 afterExecute 进行处理。这里会捕获 RuntimeException, Error 和任意 Throwable。因为不能在 Runnable.run() 中重新抛出 Throwable,所以在去往线程的 UncaughtExceptionHandler 的路上将其包装为 Error。对于任何抛出的异常,这里都会保守地/谨慎地让当前线程死亡。
  5. task.run() 完成后,就调用 afterExecute 方法,它也可能抛出异常,这也将导致线程死亡。根据 JLS Sec 14.20(Java语言规范14.20章节),这个异常将会生效,即使 task.run 也抛出了异常。

这个异常机制的实际作用就是给 afterExecute 和线程的 UncaughtExceptionHandler 提供关于用户代码碰到的任何问题的尽可能准确的信息。

有个疑问,这里只要因异常导致 worker 退出,就会添加新 worker 来替代正在退出的 worker。如果任务执行过程中需要连接数据库,而数据库又宕机了,岂不是会频繁创建、销毁线程?进而导致应用服务越来越慢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 标示变量,当前线程是否异常结束
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();

// 如果线程池正在停止,确保线程被中断;如果不是,确保线程不会被中断。
// 对于第二种情况,这需要重新检查状态以处理在清除中断状态的时候与 shutdownNow 之间的竞争。
// 注:shutdownNow 方法会将状态推进至 STOP。
// ------ comment end ------
// 这段注释很容易理解,但代码有点晦涩,"重新检查"指重新检查状态是否大于等于STOP;
// "第二种情况"指线程池并非正在停止,也就是状态小于STOP。
// 对于第二种情况的处理很隐晦,第一次测试[runStateAtLeast STOP]若为真,就命中第一种情况;
// 若为假,就命中第二种情况,进而继续执行 "||" 运算符右边的条件表达式。
if (( runStateAtLeast(ctl.get(), STOP)
// Thread.interrupted() 检查当前线程是否被中断,并清除中断状态(interrupted status)
|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 将 task 置为 null 很重要,
// 否则 firstTask 不为空时永远也不会从队列获取任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 不管当前线程是正常还是异常退出,都要进行清理和簿记工作
processWorkerExit(w, completedAbruptly);
}
}

// 该方法做了下面这几件事儿:
// 1. 将池状态变迁为 STOP
// 2. 通过 Thread.interrupt 中断所有正在活跃地执行着的任务
// 3. 停止处理等待执行的任务,将它们从队列中排空(从队列删除)至一个列表,返回此列表。
// 注意,这个方法不会等待正在执行的任务正常结束,它通过 Thread.interrupt 撤销任务的执行,
// 因此任何未能响应中断的任务可能永远不会终止。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

getTask() 方法很有料,它返回 null 就表示当前线程必须要退出。线程空闲时间的记录和空闲超时处理也在其中。
该方法执行阻塞(blocking)或定时(timed)等待任务,具体取决于当前的配置设置。如果因为以下任何一种情形当前 worker 必须退出就返回 null:

  1. worker 数多于 maxPoolSize(由于调用 setMaximumPoolSize 方法重新设置了)
  2. 线程池已停止
  3. 线程池已关闭,且任务队列为空
  4. 当前 worker 等待任务超时,且超时的 worker 需要终止(根据配置或线程总数),也就是说,条件 [allowCoreThreadTimeOut || workerCount > corePoolSize] 为真。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

private Runnable getTask() {
// 等待任务是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果线程池已关闭
// 或者池状态为 SHUTDOWN 且任务队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 是否需要剔除超时的 worker(根据配置或线程总数)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ( (wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()) ) {

if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 阻塞或定时获取任务
Runnable r = timed ?
// 定时等待
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 阻塞等待
workQueue.take();
if (r != null)
return r;

// 此时虽然 r 为 null,但可不能直接返回 null,
// 因为还需要根据用户的配置或线程总数来决定
timedOut = true;
} catch (InterruptedException retry) {
// 走到这儿,肯定是定时获取任务超时。
timedOut = false;
}
}
}

processWorkerExit 方法为正在死亡的 worker 执行清理和簿记工作。对于正常退出的 worker,该方法假定 workerCount 在其他地方已被调整。
在下面几种情况下,会添加新 worker 来替代正在退出的 worker:

  1. 因用户任务异常而退出
  2. worker 数小于 corePoolSize
  3. 任务队列非空,但没有 worker 了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果是异常退出,那 workerCount 还未调整
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累计已完成的任务数
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate(); // 尝试终止线程池

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) { // 如果是正常退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;

// 如果 worker 数大于计算得出的合理最小值,
// 就无需添加新worker来替代正在死亡的worker
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

参考资源

ThreadPoolExecutor Java Docs
JDK ThreadPoolExecutor 源码

0%