线程池

1 实现原理

当向线程池提交一个任务之后,线程池的处理流程如下:

  1. 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程

  2. 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程

  3. 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。

处理流程

2 java中的线程池

jdk中提供了线程池的具体实现,实现类是:java.util.concurrent.ThreadPoolExecutor,主要构造方法:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

corePoolSize:核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前把核心线程都创造好,并启动。

maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用了无界队列,那么所有的任务会加入队列,这个参数就没有什么效果了。

keepAliveTime:线程池的工作线程空闲后,保持存活的时间。如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率。

unit:keepAliveTime的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit,这个枚举也经常使用。

workQueue:工作队列,用于缓存待处理任务的阻塞队列,常见的有4种。

threadFactory:线程池中创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

handler:饱和策略,当线程池无法处理新来的任务了,那么需要提供一种策略处理提交的新任务,默认有4种策略。

3 饱和策略

当线程池中队列已满,并且线程池已达到最大线程数,线程池会将任务传递给饱和策略进行处理。这些策略都实现了RejectedExecutionHandler接口。接口中有个方法:

1
2
// 参数说明:r:需要执行的任务;executor:当前线程池对象
void rejectedException(Runnable r, ThreadPoolExcutor executor)

JDK中提供了4种常见的饱和策略:

​ (1)AbortPolicy:直接抛出异常

​ (2)CallerRunsPolicy:在当前调用者的线程中运行任务,即随丢来的任务,由他自己去处理

​ (3)DiscardOldestPolicy:丢弃队列中最老的一个任务,即丢弃队列头部的一个任务,然后执行当前传入的任务

​ (4)DiscardPolicy:不处理,直接丢弃掉,方法内部为空

4 关闭方法

原理:遍历线程池中的所有线程,然后逐个调用线程的interrupt方法来中断线程。

线程池提供了2个关闭方法:shutdown和shutdownNow,当调用者两个方法之后,线程池会遍历内部的工作线程,然后调用每个工作线程的interrrupt方法给线程发送中断信号,内部如果无法响应中断信号的可能永远无法终止,所以如果内部有无线循环的,最好在循环内部检测一下线程的中断信号,合理的退出。调用者两个方法中任意一个,线程池的isShutdown方法就会返回true,当所有的任务线程都关闭之后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

调用shutdown方法之后,线程池将不再接口新任务,内部会将所有已提交的任务处理完毕,处理完毕之后,工作线程自动退出。

而调用shutdownNow方法后,线程池会将还未处理的(在队里等待处理的任务)任务移除,将正在处理中的处理完毕之后,工作线程自动退出。

至于调用哪个方法来关闭线程,应该由提交到线程池的任务特性决定,多数情况下调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

5 合理配置线程池

要想合理的配置线程池,需要先分析任务的特性,可以冲一下几个角度分析:

​ (1)任务的性质:CPU密集型任务、IO密集型任务和混合型任务

​ (2)任务的优先级:高、中、低

​ (3)任务的执行时间:长、中、短

​ (4)任务的依赖性:是否依赖其他的系统资源,如数据库连接。

性质不同任务可以用不同规模的线程池分开处理。CPU密集型任务应该尽可能小的线程,如配置cpu数量+1个线程的线程池。由于IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量*2。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无界队列,任务太多的时候可能导致系统OOM,直接让系统宕机。

6 JDK中的实现的四种线程池

Java通过Executors提供四种线程池,分别为:

  1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
1
2
3
4
5
6
7
8
9
10
11
/** 
* 核心线程数 0
* 最大线程数 Integer.MAX_VALUE
* 一个线程如果在60s还没有被使用的话会被移除线程池
* 阻塞队列使用SynchronousQueue
* 使用中断的拒绝策略
*/
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

特点:

(1)按需创建新的线程,如果没有可用线程则创建新的线程,之前用过的线程可能会再次被使用;

(2)因为空闲线程会被移除线程池,因此,如果线程池长时间不被使用也不会消耗系统资源;

  1. newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
1
2
3
4
5
6
7
8
/** 
* 核心线程数=最大线程数=参数nThread
* 阻塞队列使用LinkedBlockingQueue,一个共享的无界队列
*/
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

特点:

(1)在任何情况下最多只有nThread个线程工作,多余的Task将会被存放到队列中等待;

(2)如果线程在执行任务中被终止,终止之前会创建其他的线程代替原来的;

(3)线程将会一直存在在线程池中,直到调用shutDown()方法

  1. newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行。
1
2
3
4
5
6
7
8
9
/** 
* 核心线程数:通过参数指定corePoolSize
* 最大线程数 Integer.MAX_VALUE
* 超过corePoolSize的线程在执行完任务后即终止
* 阻塞队列使用DelayedWorkQueue
*/
public ScheduledThreadPoolExecutor(int corePoolSize){
super(corePoolSize, Integer.MAX_VALUE, 0L, NANOSECONDS, new DelayWorkQueue());
}

特点:

(1)核心线程数将会一直存在线程池中,除非设置了allowCoreThreadTimeOut

(2)可以设置线程的执行时间

  1. newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
1
2
3
4
5
6
7
8
9
10
/** 
* 线程池中最多同时只有一个线程活跃
* 同一时刻只有一个任务执行
* 多余的任务放在LinkedBlockingQueue中
*/
public static ExecutorService newSingleThreadPool(){
return new FinalizeableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

7 源码分析

7.1 内部状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:

  1. RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;

  2. SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;

  3. STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;

  4. TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;

  5. TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成

7.2 任务执行

execute –> addWorker –>runworker (getTask)

线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。

从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

7.2.1 execute()

ThreadPoolExecutor.execute(task)实现了Executor.execute(task)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int c = ctl.get();
if(workerCountOf(c) < corePoolSize){
//workerCountOf获取线程池的当前线程数,小于corePoolSize,执行addWorker创建新线程执行command任务
if(addWorker(command, true))
return;
c = ctl.get();
}
//double check: c, recheck
//线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
if(isRunning() && workQueue.offer(command)){
int recheck = ctl.get();
//recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
if(!isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程
else if(workerCountOf(recheck) == 0)
addWorker(null, false);
}
//线程池中创建新的线程失败,则reject任务
else if (!addWorker(command, false))
reject(command);

在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

7.2.2 addWorker()

做了两件事:(1)循环CAS操作来将线程数加1;(2)新建一个线程并启用。

从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,线程池创建新线程执行任务时,需要获取全局锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {
retry;
for(;;){
int c = ctl.get();
int rs = runstateOf(c);

//是否能够添加工作线程
//判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回
if(rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;

/** 做自旋,更新创建线程数量
* 通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,
* 则跳出循环,开始创建新的线程。
*/
for(;;){
int wc = workereCountOf(c);
if(wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if(compareAndIncrementWorkerCount(c))
break retry;
if(runstateOf(c) != rs)
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if(t != null){
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//添加线程到workers中(线程池中)。
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)){
if(t.isAlive())
throw new IllegalThreadStataeException();
workers.add(w);
int s = workers.size();
if(s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unLock();
}
启动新建的线程。
if(workerAdded){
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

其中workers是一个hashSet。所以,线程池底层的存储结构其实就是一个HashSet。

7.2.3 runWorker

runWorker方法是线程池的核心:

  1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;

  2. Worker执行firstTask或从workQueue中获取任务:

2.1 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)

2.2 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。

2.3 执行beforeExecute

2.4 执行任务的run方法

2.5 执行afterExecute方法

2.6 解锁操作

通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

7.2.4 getTask

allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。

如果线程允许空闲等待而不被销毁timed == false,workQueue.take任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;

如果线程不允许无休止空闲timed == true, workQueue.poll任务:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

7.3 任务提交

在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。

​ 1. Callable接口类似于Runnable,只是Runnable没有返回值;

​ 2. Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;

​ 3. Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

7.3.1 submit

1
2
3
4
5
6
7
8
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task){
if(task == null) throw new NullPointException();
//通过submit方法提交的Callable任务会被封装成了一个FutureTask对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

7.3.2 FutureTask对象

内部状态

1
2
3
4
5
6
7
8
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

get方法:内部通过awaitDone方法对主线程进行阻塞:

  1. 如果主线程被中断,则抛出中断异常;

  2. 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;

  3. 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;

  4. 通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;

  5. 最终通过LockSupport的park或parkNanos挂起线程;

run方法:

FutureTask.run方法是在线程池中被执行的,而非主线程

  1. 通过执行Callable任务的call方法;

  2. 如果call执行成功,则通过set方法保存结果;

  3. 如果call执行有异常,则通过setException保存异常;


线程池
http://www.zivjie.cn/2023/03/12/java基础/多线程/线程池/
作者
Francis
发布于
2023年3月12日
许可协议