type
status
date
slug
summary
tags
category
icon
password
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize/maxiumPoolSize
- 运行的线程数 < corePoolSize
创建新线程,不管其他线程是否处于Idle状态
- corePoolSize < 运行的线程数 < maxiumPoolSize
只有当队列满的时候,才会创建新线程
- corePoolSize = maxiumPoolSize
固定大小的线程池
- maxiumPoolSize = Integer.MAX_VALUE
容纳任意数量的并发任务
keepAliveTime/unit
如果线程池当前线程数超过corePoolSize,多余的线程如果在IDLE状态下超过keepAliveTime就会被终止.
使用Long.MAX_VALUE TimeUnit#NANOSECONDS作为keepAliveTime,可以阻止空闲线程在关闭前被终止.
默认情况下,keepAliveTime策略应用在线程数超过corePoolSize的情况.但是使用allowCoreThreadTimeOut(boolean)方法,只要keepAliveTime大于0,也可以应用到核心线程.
workQueue
任何
BlockingQueue
的实现都可以用来传递和持有提交的任务.- 运行线程数 < corePoolSize时
Executor倾向于创建新线程而不是将任务入队
- 运行线程数 >= corePoolSize时
Executor倾向于将任务入队,而不是创建新线程
- 任务请求不能入队(比如队列已满)
如果运行线程数不超过maxiumPoolSize,则创建新线程,否则,被拒绝
常见的三种队列对应三种入队策略:
- 直接转移,队列不缓存任务
这里可以使用
SynchronousQueue
,这个队列是没有内部缓存的,也就是说来一个任务马上就会转移出去,可能是复用线程,创建线程,或者直接被拒绝.通常为了防止任务频繁被拒绝,可以将maxiumPoolSize设置为无限大,即Integer.MAX_VALUE.Executors#newCachedThreadPool()
就属于这种策略.- 无界队列
比如使用
LinkedBlockingQueue
,当corePoolSize个线程都在忙时,新任务只能在队列里等待.因此,线程数不会超过corePoolSize,**设置的maxiumPoolSize没有任何作用.**适合于那种任务之间没有任何依赖的场景.不过当任务处理时间比较久时,会导致队列增长过快.- 有界队列
比如使用
ArrayBlockingQueue
,当设置了有限的maxiumPoolSize时可以防止系统资源被耗尽,但是相对更难控制.往往都需要在队列大小与最大线程数之间进行折衷.如果使用了大的队列,少的线程数可以节约CPU,系统资源,和线程上下文切换的开销,但是会导致任务吞吐量变低,如果任务频繁的被阻塞,系统有时候也会多调度一些进程(会超过你设置的限制);
如果使用了小队列,多线程数,会使CPU更加忙碌,但是可能导致过载,反而降低吞吐量.
我们可以使用
getQueue()
方法获取当前队列,不过只建议在debug和监控的时候使用这个方法.如果想取消排队任务,可以使用
remove(Runnable r)
或者purge()
方法.handler
即
RejectedExecutionHandler
,当Executor关闭时,或者使用有界队列,并且线程数达到了maxiumPoolSize,新任务会被拒绝.目前有4种预定义的拒绝策略:
- 直接抛异常
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
- 如果线程池没有关闭,直接调用Runnable的run方法继续执行(在调用者线程)
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
- 直接丢弃任务,不执行
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
- 丢掉队列中最久的任务,添加新任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
在实际应用过程中,我们一般都会自定义拒绝策略,做一些特殊处理.
Hook方法
ThreadPoolExecutor
中有两个protected的方法:- beforeExecute(Thread t, Runnable r)
- afterExecute(Thread t, Runnable r)
这两个方法分别在任务执行前后被调用.
- 作者:姜康
- 链接:https://jiangkang.tech/article/32a2716a-5e33-4ab9-84d4-601a2fce44db
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章