1. ThreadPoolExcecutor
初始化
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, //线程工厂,默认为DefaultThreadFactory
RejectedExecutionHandler handler //拒绝执行处理器-默认为AbortPolicy
)
1.1 基本参数
corePoolSize: 核心线程数,线程池中线程回收的下限,当线程池中的线程数量在核心线程数以内,则即使
线程处于空闲也不会回收。
maximumPoolSize: 线程池的最大线程数。线程池最多只能创建这么多线程,ThreadPoolExecutor会根据corePoolSize和
maximumPoolSize自动线程池的参数。该参数也提供了相应的修改器方法去修改
keepAliveTime/unit: 配置空闲线程保活时间,如果线程池的线程数大于核心线程数且线程的空闲时间超过这个时间以后,则回收该线程。如果allowCoreThreadTimeOut
为true,那么核心线程也会被超时回收。
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
1.2 workQueue
- 任务队列,新提交的任务还没执行时会在该任务队列中排队等待。
任务队列类型
ArrayBlockingQueue
- 特点:有界阻塞队列,使用数组实现。
- 适用场景:适用于对任务数量有严格限制的场景,能有效控制内存使用。
- 优点:防止内存溢出,适合生产者和消费者速率大致相当的情况。可以控制队列的大小。
LinkedBlockingQueue
- 特点:可选择有界或无界的阻塞队列,使用链表实现。
- 适用场景:适合任务数量不确定的场合,尤其是任务产生速率远大于消费速率时。
- 优点:无界时可以动态扩展,适合处理大量任务。
SynchronousQueue
-直接交接- 特点:不存储任务的队列,每个插入操作必须等待另一个线程的对应移除操作。
- 适用场景:适合需要快速交换任务的场景,例如短小的计算任务。
- 优点:可以充分利用 CPU,适合高并发场景。
PriorityBlockingQueue
- 特点:无界的阻塞队列,支持优先级排序。
- 适用场景:需要根据任务优先级处理任务的情况。
- 优点:可以灵活处理不同优先级的任务。
选择因素
- 任务特性
- 任务的生产和消费速率:如果生产速率远高于消费速率,可能需要使用无界队列(如
LinkedBlockingQueue
)。 - 任务的处理时间:短小任务适合
SynchronousQueue
,长时间运行的任务可能适合有界队列以控制队列大小。
- 任务的生产和消费速率:如果生产速率远高于消费速率,可能需要使用无界队列(如
- 内存管理
- 选择有界队列(如
ArrayBlockingQueue
或LinkedBlockingQueue
)可以防止内存溢出,特别是在高负载情况下。
- 选择有界队列(如
- 线程池配置
- 如果线程池的核心线程数较小,使用无界队列可以防止任务被拒绝,反之则可以使用有界队列以控制并发任务数量。
- 优先级需求
- 如果需要根据任务优先级处理,可以选择
PriorityBlockingQueue
。
- 如果需要根据任务优先级处理,可以选择
1.3 threadFactory
- 线程池创建工作者的线程时,使用的工厂类。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
1.4 handler
- 当线程提交到线程池,但因为运行的线程已到达最大线程数且任务队列已满或者其他原因无法执行时的处理策略。
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
2. 工作者线程的创建逻辑
线程池创建之后,创建线程有2种方式,一种是线程池创建了之后,预启动线程,另一种是当线程池接收到任务之后,才去创建线程执行任务。
-
预启动线程
场景:创建线程池时,任务队列已经有任务。
//该方法的逻辑为,如果当前线程池中运行的线程数小于核心线程数,则创建一个新的工作者线程加入池中 public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } //该方法的逻辑同上,不同的地方是如果池中运行的线程为0,则创建一个新的工作者线程,无论核心线程数 //配置的是多少。保证了最少有一个线程预启动了。 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } //预启动所有的所有的核心线程 public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
-
任务提交时,创建线程
场景: 使用submit或者execute方法提交任务时
- 任务提交时,如果当前运行的线程数量少于配置的核心线程数,则创建新的工作者线程,并启动运行该任务,无论当前池中的已有的线程是否有空闲的。
3. 任务执行的逻辑
任务的提交可以使用submit或execute。提交了之后运行的主要逻辑如下:
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
如果还可以再创建线程,就创建新的线程执行任务,负责就加入到任务队列排队,工作者线程轮询去获取任务执行。如果既不能创建工作者线程执行,队列也满了,就拒绝任务(使用饱和策略来处理任务)。
4. 线程池的参数配置
可修改与不可修改
- 线程池的参数不仅可以通过构造器传参的方式进行设置,还可以使用ThreadPoolExecutor的修改器方法进行修改。
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,
1,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100));
poolExecutor.setThreadFactory((runnable)->{
Thread t = new Thread(runnable);
t.setPriority(5);
t.setName("tmp-thread");
return t;
});
poolExecutor.setMaximumPoolSize(100);
- Exectuors提供默认线程池,返回类型都是
ExecutorService
newSingleThreadExecutor
方法返回的是通过FinalizableDelegatedExecutorService封装的ExecutorService,使用了代理模式,阻止外部方法对配置做修改。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 而其他的默认线程池则可以进行配置信息修改
ExecutorService executorService = Executors.newFixedThreadPool(10);
if (executorService instanceof ThreadPoolExecutor) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
executor.setMaximumPoolSize(100);
executor.setCorePoolSize(10);
System.out.println(String.format("线程池池的核心线程数为%d,最大线程数为%d", executor.getCorePoolSize(), executor.getMaximumPoolSize()));
}
//执行后输出信息为
// > Task :ThreadPoolExecutorTest.main()
// 线程池池的核心线程数为10,最大线程数为100
- 也可以通过调用unconfigurableExecutorService方法,装饰自己创建的线程池,防止参数被修改。
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
5. 线程池的钩子函数
//线程执行之前
//Method invoked prior to executing the given Runnable in the given thread. This method is invoked by thread t that will execute task r, and may be used to re-initialize ThreadLocals, or to perform logging.
//This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.beforeExecute at the end of this method.
protected void beforeExecute(Thread t, Runnable r) { }
//线程之后之后
//Method invoked upon completion of execution of the given Runnable. This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.
//This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.
//Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method. If you would like to trap both kinds of failures in this method, you can further probe for such cases, as in this sample subclass that prints either the direct cause or the underlying exception if a task has been aborted:
protected void afterExecute(Runnable r, Throwable t) { }
//线程池关闭后的钩子,可以在线程池关闭之后处理其他资源或者打日志
protected void terminated() { }