IOTXING

记录技术学习之路

0%

java线程池源码解读

java中提供的线程池基本都是继承自ExecutorService,可以通过Executors来创建线程池。
java中提供了以下几种线程池,

  • newCachedThreadPool
  • newFixedThreadPool
  • newSingleThreadExecutor
  • newScheduledThreadPool

newFixedThreadPool

固定容量的线程池,除非某个线程被显式关闭,否则线程会一直存活。在执行任务的时候,如果线程池的线程都已经在工作状态了,新来的任务就会被加入等待队列,直到线程池中有线程执行完,或者出现异常被终止等情况,新的任务就会占据这个线程开始执行。

newCachedThreadPool

该线程池不能传入线程池大小,默认设定代销为Integer的MAX_VALUE。每个执行完的线程,会保留60s,如果60s内有新任务进来,则重复利用该线程去跑,如果超过60s,没被利用,则线程就会被销毁。如果长时间没有新任务进来,该线程池不消耗任何资源

newSingleThreadExecutor

只有一个线程的线程池,在线程挂了之后,会起一个新的线程来替代

newScheduledThreadPool

可以定期执行线程,或者是延迟一定时间后,执行

ThreadPoolExecutor

这个类里面核心的内容,是一个变量ctl
ctl一个atomicInteger类型的数据,结合了workerCount和runState
runState表示的是线程池的状态
为了能够存储这两项数据,workerCount被限制了最大值只能是2^29-1,而不是2^32-1。这样就能空出来2^2个位置,来存放runState了
这里定义了最大的容量
此时CAPACITY = 0001111111111111111111;

1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

runState有五种,占两个位置

1
2
3
4
5
6
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

这里将五个状态为,分别都左移29位,刚好利用两个高位进行存储。

这里有三个函数,用来辅助计算ctl中的数据的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//CAPACITY取反 是111000000000000000;
//c&-CAPACITY,刚好取出前三位,后面都是0,这三位刚好存储的是状态



private static int workerCountOf(int c) { return c & CAPACITY; }

//c&CAPACITY,把最高的三位置为0,剩下的就都是worker的数量了



private static int ctlOf(int rs, int wc) { return rs | wc; }

//这里通过或操作,将作为高3为的runState和低29位的workerCount合并到一起

这里我们执行任务的时候,需要用到execute函数
execute函数会遇到三种情况

  • 活动线程小于poolsize,直接新建活动线程加入到线程池,并执行任务
  • 没有空闲空间了,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); //如果任务为空,抛出异常
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //取出当前执行的线程数量,如果小于线程池大小,加入新的worker
if (addWorker(command, true))
return; //加入成功,直接返回
c = ctl.get(); //加入失败,取出最新的数据,因为在这期间worker的数量可能已经发生变化了
}
if (isRunning(c) && workQueue.offer(command)) {
//如果当前线程池处于RUNNING状态,并且任务队列可以成功加入任务
int recheck = ctl.get(); //二次确认
if (! isRunning(recheck) && remove(command))
//如果当前线程池不是RUNNING状态,则从队列中移除任务
reject(command); //拒绝任务

else if (workerCountOf(recheck) == 0) //该情况考虑的是线程池是SHUTDOWN状态,但是队列还有任务没有完成,需要加一个null任务进入线程池,避免任务进入
addWorker(null, false);
}
//如果添加任务失败,拒绝任务
else if (!addWorker(command, false))
reject(command);
}

AddWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //获取当前线程池的状态

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; //如果状态不是RUNNING,或者状态是SHUTDOWN,队列第一个任务是空,并且队列不为空,直接返回false

for (;;) {
int wc = workerCountOf(c); //获取到workerCount的大小
if (wc >= CAPACITY || //如果超过了CAPACITY或者超过设定的线程池大小
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) //尝试对c增加1,里面使用了自旋锁,加1因为加在了低位,
break retry; //跳出循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) //如果当前状态与上面取的不一样,说明发生了变化,重新开始循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null; //初始化worker的信息
try {
w = new Worker(firstTask);
final Thread t = w.thread; //新建一个worker
if (t != null) {
final ReentrantLock mainLock = this.mainLock; //获取锁,同时只能有一个线程对ctl进行修改
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); //再次校验线程池的状态

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { //如果线程池处于RUNNING状态,或者处于SHUTDOWN但是传入的任务是null
if (t.isAlive()) // 如果新加入的线程已经开始工作,并且还没有结束,抛出异常
throw new IllegalThreadStateException();
workers.add(w); //将当前的worker加入到worker列表
int s = workers.size(); //更改最大的线程池大小
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); //释放锁
}
if (workerAdded) {
t.start();
workerStarted = true; //修改标志位
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted; //返回线程工作状态
}

Worker worker是扩展AbstractQueuedSynchronizer,这是一个队列同步器,主要用到其中的锁,在runWorker执行task的时候,必须要保证该task没有被执行,并且执行过程中不会被其它的线程池执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果传入的任务不为空,或者任务取出来的任务不为空,继续下面逻辑
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //几种中断的场景
try {
beforeExecute(wt, task);
Throwable thrown = null; //执行任务
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}

getTask 从任务队列中取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 校验线程池的状态,以及队列是否为空
//如果当前状态不是RUNNING,rs>=STOP或者队列已经是空的
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //开始减少wc的数量
return null;
}

int wc = workerCountOf(c);
//获取wc
// Are workers subject to culling?
//如果设置了核心线程池大小,并且wc已经超过了,或者设置了核心线程超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//如果大小超过设定的大小,或者超时了,或者是wc大于1或者队列为空,返回空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //这里
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); //如果设置了超时时间,就使用poll,如果没有设置,使用take,take是会阻塞线程
if (r != null)
return r; //获取到任务,直接返回
timedOut = true; //超时未获取到任务
} catch (InterruptedException retry) {
timedOut = false;
}
}
}