本文对 Java 中的线程池的使用进行学习。是对以下文章的摘录:
- JDK 源码
- 阿里编程规范插件提示
Executor
Executor 接口
Java 类库中任务执行的抽象接口是 Executor
。这个接口使得任务提交和任务如何被执行(包括线程使用细节、调度等)得到了解耦。该接口如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
/**
* 在将来某个时刻执行任务( command )。这个任务可以通过这些方式执行:在一个新的线程中执行、
* 在一个缓存的线程中执行、在调用者线程中执行。具体取决于 Executor 的实现方式。
*
* @param command 需要运行的任务
* @throws RejectedExecutionException 如果执行器不接受任务,则抛出改异常。
* @throws NullPointerException 如果任务为 null
*/
void execute(Runnable command);
}
使用 Executor 执行任务
Executor
经常被用来代替显示地创建线程。
自己手工创建线程执行(不建议):1
new Thread(new RunnableTask()).start()
使用 Executor
执行:1
2
3
4Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
Executor 的执行方案
Executor
接口并不强制异步执行任务。一个简单的例子是,执行器可以在调用线程中立刻运行提交任务。1
2
3
4
5class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}}
更加典型的方案是,在其它线程而非调用者线程中执行任务。下面的 executor 为每个任务创建了一个执行线程。1
2
3
4
5class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}}
很多 Executor
的实现对任务何时以何种方式执行都会添加一些限制。下面的执行器将提交的任务按顺序在另一个执行器中执行,成为一个组合执行器。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
// 每个任务执行完后,执行下一个任务
scheduleNext();
}
}
});
// 没有任务的时候,启动此任务
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// 当下一个任务存在时,执行任务
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}
ExecutorService
ExecutorService 接口
Executor
有一个扩展接口 ExecutorService
。ExecutorService
能够提供方法来管理终止,也能够创建 Future
来跟踪一个或多个任务的异步执行过程。该接口如下图所示:
ExecutorService 关闭
ExecutorService
可以被关闭,关闭后它会拒绝新的任务。有两种关闭 ExecutorService
的方法:
shutdown
:该方法将会保证关闭之前提交的任务会在关闭前被执行。shutdownNow
:将会阻止任务的启动,并且尝试停止当前执行的任务。
一旦 ExecutorService
被关闭(termination),执行器将会没有正在运行的任务、没有正在等待执行的任务、没有新的任务被提交。一个没有使用的 ExecutorService
应该被关闭,从而使得资源可以被回收,
任务执行
submit
方法扩展于 Executor
中的基本方法 execute(Runnable)
,该方法创建和返回了一个 Future
对象,通过这个对象可以取消任务的执行或者等待任务的结束。
invokeAny
和 invokeAll
方法可以用来执行非常通用有效的批量执行。执行一批任务,等待它们中的一个或者全部执行完成。ExecutorCompletionService
可以用来实现这些方法的变体。
使用例子
下面是一个网络服务的例子。该例子通过线程池中的线程来服务到达的请求,使用到了Executors
中的 newFixedThreadPool
方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}}
下面的代码展示了通过两个阶段关闭 ExecutorService
:
- 首先,调用
shutdown
来拒绝之后到达的任务。 - 然后,如果有必要的话,可调用
shutdownNow
来取消滞留的任务。
1 | void shutdownAndAwaitTermination(ExecutorService pool) { |
ThreadPoolExecutor
ThreadPoolExecutor
类提供了一个可扩展的线程池实现。
基本介绍
线程池用来解决两个不同方面的问题:
- 由于减少了每个任务的调用开销,通常可以在执行大量异步任务的时候提高性能。
- 提供了资源控制和管理的。
每个 ThreadPoolExecutor
具有一些基本的统计,例如:任务执行完成的数量。
为了适应广泛的应用场景,该类提高了很多可以调整的参数以及可以扩展的钩子。此外,更简单的一种方式是使用 Executors
中的一些工厂方法,包括:
newCachedThreadPool
:没有边界,自带线程复用。newFixedThreadPool
:固定线程池大小。newSingleThreadExecutor
: 单线程。
这些覆盖了大部分通用的场景。
构造函数
当需要手工配置这个类或者对其参数进行调整时,就需要了解其构造函数。ThreadPoolExecutor
有很多构造函数,下面是最常见的形式。1
2
3
4
5
6
7public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
corePoolSize and maximumPoolSize
ThreadPoolExecutor
将自动通过基本大小(corePoolSize)、最大大小(maximumPoolSize)来调整线程池的大小。
当有新的任务通过 execute(Runnable)
方法提交时:
- 如果当前运行的线程数目小于基本大小(corePoolSize),即使有其它空闲线程,也将会创建一个新的线程来处理这个请求。
- 如果当前运行的线程数目大于基本大小(corePoolSize),小于最大大小(maximumPoolSize),并且缓存的队列(queue)满的时候,将会创建新的线程。
设置示例:
- 通过设置
corePoolSize
和maximumPoolSize
相同,你可以创建一个固定大小(fixed-size)的线程池。 - 通过将
maximumPoolSize
设置成一个很大的值,比如:Integer.MAX_VALUE
,可以使线程池容纳任意数量的并发任务。
通常,这些参数在构造函数中被设置,但是它们也可以通过这些方法动态改变:
setCorePoolSize
setMaximumPoolSize
keepAliveTime
如果线程池中当前执行的线程数目大于 corePoolSize
,那么对于多出的线程,当它们空闲的时间超过 keepAliveTime
时,它们将被终止。
这样的机制,使得当线程池不活跃的时候,可以减少资源的消耗。当线程池变得活跃的时候,新的线程会被创建。这个参数也通过 setKeepAliveTime(long TimeUnit)
方法动态改变。
通过设置这个参数为 Long.MAX_VALUE
TimeUnit.NANOSECONDS
,可以禁止空闲的线程被终止。
默认情况下,这个 keep-alive
策略只会在当前线程数目超过 corePoolSize
的时候才会起作用,也可以通过 allowCoreThreadTimeOut(boolean)
来控制,此时 keepAliveTime
的值不能为0.
Queuing
BlockingQueue
可以用来转移和持有提交的任务。它的使用时和线程池的大小相关的:
- 如果小于
corePoolSize
数目的线程在运行,那么Executor
倾向于创建新的线程来执行任务,而不是将任务缓存到队列中。 - 如果大于等于
corePoolSize
数目的线程在运行,那么Executor
倾向于将任务缓存到队列中,而不是创建新的线程。 - 如果请求达到限制无法被缓存到队列中,那么一个新的线程将会被创建,当创建的线程数将要超过
maximumPoolSize
时,新的任务将会被拒绝。
以下是一些通用的队列策略:
- 直接切换:工作队列一个好的默认选择可以是
SynchronousQueue
,它可以将任务交给线程执行,并且不需要缓存任务。当没有线程可用的时候,尝试缓存任务到队列中将会立即失败,因此一个新的线程将会被创建。这个策略可以避免由于任务间存在内部依赖造成的死机。直接切换通常需要一个没有限制的maximumPoolSizes
,从而避免拒绝新的任务,但当处理不够及时,线程数目也会持续增加。 - 无界队列:使用无界队列(例如:
LinkedBlockingQueue
没有预先定义队列容量),如果所有的corePoolSize
线程都很忙碌,那么新的任务将会被保存在队列中进行等待。因此,线程数永远不会超过corePoolSize
,而maximumPoolSize
的值在这里也不会起作用。这样的策略比较适合于每个任务都是独立执行的场景。例如,在 Web 页面服务中,队列可以用来平滑瞬时的访问高峰。 - 有界队列:一个有界限的队列(例如:
ArrayBlockingQueue
)可以防止maximumPoolSizes
被设置为无限大时造成的资源耗尽。队列的大小和线程池大小可以互相调和:使用大的队列和小的线程池可以降低 CPU 使用率、操作系统资源占有、上下文切换的开销,但是会导致较低的吞吐量。如果任务经常被阻塞(I/O 受限),系统可能会调度更多的线程,超过你开始的限制。当使用小队列的时候,需要使用较大的线程池大小,这可以使得 CPU 更加忙碌,但是可能会由于频繁的上下文切换,导致吞吐量下降。
ThreadFactory
ThreadFactory
是用来创建新的线程。下面是该接口的声明。1
2
3
4
5
6
7
8
9
10
11
12
13/**
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory {
/**
* 构建线程 Thread。实现中可以设置优先权、名字、守护状态、线程组等。
*
* @param r 一个可以被线程实例运行的任务
* @return 创建的线程或者 null (创建被拒绝)
*/
Thread newThread(Runnable r);
}
该接口一个简单的实现为:1
2
3
4
5class SimpleThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r);
}
}}
Executors
中的 defaultThreadFactory
方法提供了一个更加简单实用的实现,可以设置线程环境为已知值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
Rejected tasks
在以下两种情况下,新的任务将会被拒绝:
Executor
被关闭。Executor
使用有界的线程池大小和工作队列容量后,达到饱和。
任何一种情况下,都会调用 RejectedExecutionHandler
中的 rejectedExecution
方法。
预先定义的一些拒绝策略包括:
- 默认是
ThreadPoolExecutor.AbortPolicy
,它在拒绝时会抛出一个运行时异常RejectedExecutionException
。 ThreadPoolExecutor.CallerRunsPolicy
策略是让调用者自己来运行这个任务。这实现了一个简单的反馈控制机制,来降低新任务的提交速率。ThreadPoolExecutor.DiscardPolicy
方法直接丢弃任务。ThreadPoolExecutor.DiscardOldestPolicy
如果 executor 没有被关闭,那么丢弃队列头上的任务,任务执行会再次尝试。
扩展例子
很多基于该类的扩展都是覆盖一个或多个受保护的函数。下面例子就展示了一个子类,该之类添加了简单的暂停和恢复特性:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
Executors
Executors
类提供了方便的工厂方法来创建这些执行器。
newFixedThreadPool
该方法创建的线程池可以重复使用固定数目的线程,使用无限制的队列。1
2
3
4
5
6public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor
单线程执行,使用无限制的队列。1
2
3
4
5
6
7public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool
使用无限制的线程池,空线程超过60秒被回收,线程执行采用直接交付策略。1
2
3
4
5
6public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newScheduledThreadPool
采用 ScheduledThreadPoolExecutor
。1
2
3public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newWorkStealingPool
采用 ForkJoinPool
, 开始于 JDK 1.8。1
2
3
4
5
6public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
阿里编程规范
使用说明
线程池不允许实用 Executor
去创建,而是通过 ThreadPoolExecutor
的方式,这样的处理方式可以更加明确线程次的运行规则,规避资源耗尽的风险。
Executor
各个方法的弊端说明:
newFixedThreadPool
和newSingleThreadPool
:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。newCachedThreadPool
和newScheduledThreadPool
:主要问题是线程的最大数是Integer.MAX_VALUE
,可能会创建非常多的线程数,甚至 OOM。
使用示例
例子1:1
2
3// org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
例子2:1
2
3
4
5
6
7
8
9
10
11
12// com.google.common.util.concurrent.ThreadFactoryBuilder
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
// Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5,200,
0L,TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(1024),namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown(); // gracefully shutdown
例子3:1
2
3
4
5
6
7
8
9
10
11<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="thradFactory" value= thradFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler">
</property>
</bean>