java并发Executor框架理解

之前我一直觉得Java用Thread类和Runnable接口实现并发太简陋了,最近被问到几次并发的问题之后才发现,原来大牛们早就早好了轮子了,不过自己菜不知道而已。

用法
Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),

ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象:
Executors:提供了一系列静态工厂方法用于创建各种线程池

先介绍一下ThreadPoolExecutor,这个方法,大体代码如下,下面介绍的几个工厂方法大体上都是从它演化过来的:

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize,	// 核心池数量
int maximumPoolSize, // 池中最大数量
long keepAliveTime, // 保持连接时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 等待对列
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

主要理解这个线程池的用法
如果线程池中的线程数量少于 corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

如果线程池中的线程数量大于等于 corePoolSize,但缓冲队列 workQueue 未满,则将新添加的任务放到 workQueue 中,按照 FIFO 的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

如果线程池中的线程数量大于等于 corePoolSize,且缓冲队列 workQueue 已满,但线程池中的线程数量小于 maximumPoolSize,则会创建新的线程来处理被添加的任务;

如果线程池中的线程数量等于了 maximumPoolSize,有 4 种才处理方式(该构造方法调用了含有 5 个参数的构造方法,并将最后一个构造方法为 RejectedExecutionHandler 类型,它在处理线程溢出时有 4 种方式,这里不再细说,要了解的,自己可以阅读下源码)。
总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于 corePoolSize,再看缓冲队列 workQueue 是否满,最后看线程池中的线程数量是否大于 maximumPoolSize。

另外,当线程池中的线程数量大于 corePoolSize 时,如果里面有线程的空闲时间超过了 keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

总共有四种,举例并配合源码说一下:

1.newCachedThreadPool()

1
2
3
4
5
6
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

2.newFixedThreadPool(int)

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

3.newScheduledThreadPool(int)

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

4.SingleThreadExecutor()

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

推荐文章