线程池的使用

本文对 Java 中的线程池的使用进行学习。是对以下文章的摘录:

  • JDK 源码
  • 阿里编程规范插件提示

Executor

Executor 接口

Java 类库中任务执行的抽象接口是 Executor。这个接口使得任务提交和任务如何被执行(包括线程使用细节、调度等)得到了解耦。该接口如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
/**
* 在将来某个时刻执行任务( command )。这个任务可以通过这些方式执行:在一个新的线程中执行、
* 在一个缓存的线程中执行、在调用者线程中执行。具体取决于 Executor 的实现方式。
*
* @param command 需要运行的任务
* @throws RejectedExecutionException 如果执行器不接受任务,则抛出改异常。
* @throws NullPointerException 如果任务为 null
*/
void execute(Runnable command);
}

使用 Executor 执行任务

Executor 经常被用来代替显示地创建线程。
自己手工创建线程执行(不建议):

1
new Thread(new RunnableTask()).start()

使用 Executor 执行:

1
2
3
4
Executor executor =  anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

Executor 的执行方案

Executor 接口并不强制异步执行任务。一个简单的例子是,执行器可以在调用线程中立刻运行提交任务。

1
2
3
4
5
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}}

更加典型的方案是,在其它线程而非调用者线程中执行任务。下面的 executor 为每个任务创建了一个执行线程。

1
2
3
4
5
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}}

很多 Executor 的实现对任务何时以何种方式执行都会添加一些限制。下面的执行器将提交的任务按顺序在另一个执行器中执行,成为一个组合执行器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;

SerialExecutor(Executor executor) {
this.executor = executor;
}

public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
// 每个任务执行完后,执行下一个任务
scheduleNext();
}
}
});
// 没有任务的时候,启动此任务
if (active == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
// 当下一个任务存在时,执行任务
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}

ExecutorService

ExecutorService 接口

Executor 有一个扩展接口 ExecutorServiceExecutorService 能够提供方法来管理终止,也能够创建 Future 来跟踪一个或多个任务的异步执行过程。该接口如下图所示:

ExecutorService 关闭

ExecutorService 可以被关闭,关闭后它会拒绝新的任务。有两种关闭 ExecutorService 的方法:

  • shutdown:该方法将会保证关闭之前提交的任务会在关闭前被执行。
  • shutdownNow:将会阻止任务的启动,并且尝试停止当前执行的任务。

一旦 ExecutorService 被关闭(termination),执行器将会没有正在运行的任务、没有正在等待执行的任务、没有新的任务被提交。一个没有使用的 ExecutorService 应该被关闭,从而使得资源可以被回收,

任务执行

submit 方法扩展于 Executor 中的基本方法 execute(Runnable),该方法创建和返回了一个 Future 对象,通过这个对象可以取消任务的执行或者等待任务的结束。

invokeAnyinvokeAll 方法可以用来执行非常通用有效的批量执行。执行一批任务,等待它们中的一个或者全部执行完成。ExecutorCompletionService 可以用来实现这些方法的变体。

使用例子

下面是一个网络服务的例子。该例子通过线程池中的线程来服务到达的请求,使用到了Executors中的 newFixedThreadPool 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;

public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}

public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}

class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}}

下面的代码展示了通过两个阶段关闭 ExecutorService:

  • 首先,调用 shutdown 来拒绝之后到达的任务。
  • 然后,如果有必要的话,可调用 shutdownNow 来取消滞留的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}}

ThreadPoolExecutor

ThreadPoolExecutor 类提供了一个可扩展的线程池实现。

基本介绍

线程池用来解决两个不同方面的问题:

  1. 由于减少了每个任务的调用开销,通常可以在执行大量异步任务的时候提高性能。
  2. 提供了资源控制和管理的。

每个 ThreadPoolExecutor 具有一些基本的统计,例如:任务执行完成的数量。

为了适应广泛的应用场景,该类提高了很多可以调整的参数以及可以扩展的钩子。此外,更简单的一种方式是使用 Executors 中的一些工厂方法,包括:

  • newCachedThreadPool:没有边界,自带线程复用。
  • newFixedThreadPool:固定线程池大小。
  • newSingleThreadExecutor: 单线程。

这些覆盖了大部分通用的场景。

构造函数

当需要手工配置这个类或者对其参数进行调整时,就需要了解其构造函数。ThreadPoolExecutor 有很多构造函数,下面是最常见的形式。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }

corePoolSize and maximumPoolSize

ThreadPoolExecutor 将自动通过基本大小(corePoolSize)、最大大小(maximumPoolSize)来调整线程池的大小。

当有新的任务通过 execute(Runnable) 方法提交时:

  • 如果当前运行的线程数目小于基本大小(corePoolSize),即使有其它空闲线程,也将会创建一个新的线程来处理这个请求。
  • 如果当前运行的线程数目大于基本大小(corePoolSize),小于最大大小(maximumPoolSize),并且缓存的队列(queue)满的时候,将会创建新的线程。

设置示例:

  • 通过设置 corePoolSizemaximumPoolSize 相同,你可以创建一个固定大小(fixed-size)的线程池。
  • 通过将 maximumPoolSize 设置成一个很大的值,比如:Integer.MAX_VALUE,可以使线程池容纳任意数量的并发任务。

通常,这些参数在构造函数中被设置,但是它们也可以通过这些方法动态改变:

  • setCorePoolSize
  • setMaximumPoolSize

keepAliveTime

如果线程池中当前执行的线程数目大于 corePoolSize,那么对于多出的线程,当它们空闲的时间超过 keepAliveTime 时,它们将被终止。

这样的机制,使得当线程池不活跃的时候,可以减少资源的消耗。当线程池变得活跃的时候,新的线程会被创建。这个参数也通过 setKeepAliveTime(long TimeUnit) 方法动态改变。

通过设置这个参数为 Long.MAX_VALUE TimeUnit.NANOSECONDS,可以禁止空闲的线程被终止。

默认情况下,这个 keep-alive 策略只会在当前线程数目超过 corePoolSize 的时候才会起作用,也可以通过 allowCoreThreadTimeOut(boolean) 来控制,此时 keepAliveTime 的值不能为0.

Queuing

BlockingQueue 可以用来转移和持有提交的任务。它的使用时和线程池的大小相关的:

  • 如果小于 corePoolSize 数目的线程在运行,那么 Executor 倾向于创建新的线程来执行任务,而不是将任务缓存到队列中。
  • 如果大于等于 corePoolSize 数目的线程在运行,那么 Executor 倾向于将任务缓存到队列中,而不是创建新的线程。
  • 如果请求达到限制无法被缓存到队列中,那么一个新的线程将会被创建,当创建的线程数将要超过 maximumPoolSize 时,新的任务将会被拒绝。

以下是一些通用的队列策略:

  • 直接切换:工作队列一个好的默认选择可以是 SynchronousQueue,它可以将任务交给线程执行,并且不需要缓存任务。当没有线程可用的时候,尝试缓存任务到队列中将会立即失败,因此一个新的线程将会被创建。这个策略可以避免由于任务间存在内部依赖造成的死机。直接切换通常需要一个没有限制的 maximumPoolSizes,从而避免拒绝新的任务,但当处理不够及时,线程数目也会持续增加。
  • 无界队列:使用无界队列(例如:LinkedBlockingQueue 没有预先定义队列容量),如果所有的 corePoolSize 线程都很忙碌,那么新的任务将会被保存在队列中进行等待。因此,线程数永远不会超过 corePoolSize,而 maximumPoolSize 的值在这里也不会起作用。这样的策略比较适合于每个任务都是独立执行的场景。例如,在 Web 页面服务中,队列可以用来平滑瞬时的访问高峰。
  • 有界队列:一个有界限的队列(例如:ArrayBlockingQueue)可以防止 maximumPoolSizes 被设置为无限大时造成的资源耗尽。队列的大小和线程池大小可以互相调和:使用大的队列和小的线程池可以降低 CPU 使用率、操作系统资源占有、上下文切换的开销,但是会导致较低的吞吐量。如果任务经常被阻塞(I/O 受限),系统可能会调度更多的线程,超过你开始的限制。当使用小队列的时候,需要使用较大的线程池大小,这可以使得 CPU 更加忙碌,但是可能会由于频繁的上下文切换,导致吞吐量下降。

ThreadFactory

ThreadFactory 是用来创建新的线程。下面是该接口的声明。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory {
/**
* 构建线程 Thread。实现中可以设置优先权、名字、守护状态、线程组等。
*
* @param r 一个可以被线程实例运行的任务
* @return 创建的线程或者 null (创建被拒绝)
*/
Thread newThread(Runnable r);
}

该接口一个简单的实现为:

1
2
3
4
5
class SimpleThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r);
}
}}

Executors 中的 defaultThreadFactory 方法提供了一个更加简单实用的实现,可以设置线程环境为已知值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

Rejected tasks

在以下两种情况下,新的任务将会被拒绝:

  • Executor 被关闭。
  • Executor 使用有界的线程池大小和工作队列容量后,达到饱和。

任何一种情况下,都会调用 RejectedExecutionHandler 中的 rejectedExecution 方法。

预先定义的一些拒绝策略包括:

  • 默认是 ThreadPoolExecutor.AbortPolicy,它在拒绝时会抛出一个运行时异常 RejectedExecutionException
  • ThreadPoolExecutor.CallerRunsPolicy 策略是让调用者自己来运行这个任务。这实现了一个简单的反馈控制机制,来降低新任务的提交速率。
  • ThreadPoolExecutor.DiscardPolicy 方法直接丢弃任务。
  • ThreadPoolExecutor.DiscardOldestPolicy 如果 executor 没有被关闭,那么丢弃队列头上的任务,任务执行会再次尝试。

扩展例子

很多基于该类的扩展都是覆盖一个或多个受保护的函数。下面例子就展示了一个子类,该之类添加了简单的暂停和恢复特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();

public PausableThreadPoolExecutor(...) { super(...); }

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}

public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}

public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}

Executors

Executors 类提供了方便的工厂方法来创建这些执行器。

newFixedThreadPool

该方法创建的线程池可以重复使用固定数目的线程,使用无限制的队列。

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

newSingleThreadExecutor

单线程执行,使用无限制的队列。

1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

newCachedThreadPool

使用无限制的线程池,空线程超过60秒被回收,线程执行采用直接交付策略。

1
2
3
4
5
6
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

newScheduledThreadPool

采用 ScheduledThreadPoolExecutor

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

newWorkStealingPool

采用 ForkJoinPool , 开始于 JDK 1.8。

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

阿里编程规范

使用说明

线程池不允许实用 Executor 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式可以更加明确线程次的运行规则,规避资源耗尽的风险。

Executor 各个方法的弊端说明:

  • newFixedThreadPoolnewSingleThreadPool:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
  • newCachedThreadPoolnewScheduledThreadPool:主要问题是线程的最大数是 Integer.MAX_VALUE,可能会创建非常多的线程数,甚至 OOM。

使用示例

例子1:

1
2
3
// org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

例子2:

1
2
3
4
5
6
7
8
9
10
11
12
// com.google.common.util.concurrent.ThreadFactoryBuilder
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();

// Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5,200,
0L,TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(1024),namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown(); // gracefully shutdown

例子3:

1
2
3
4
5
6
7
8
9
10
11
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />

<property name="thradFactory" value= thradFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler">
</property>
</bean>

-------------本文结束感谢您的阅读-------------