一、前言
java.util.concurrent.ThreadPoolExecutor类是线程池中最核心的类之一,需熟练掌握Java中的线程池,必须理解其原理(针对不同四种应用场景,通过配置不同构建参数参考更多文章说明,可以通过Semaphore信号量控制并发上限,防止OOM)
public class ThreadPoolExecutor extends AbstractExecutorService {@b@ .....@b@ public ThreadPoolExecutor(int corePoolSize,@b@ int maximumPoolSize,@b@ long keepAliveTime,@b@ TimeUnit unit,@b@ BlockingQueue<Runnable> workQueue) {@b@ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,@b@ Executors.defaultThreadFactory(), defaultHandler);@b@ }@b@ @b@ public ThreadPoolExecutor(int corePoolSize,@b@ int maximumPoolSize,@b@ long keepAliveTime,@b@ TimeUnit unit,@b@ BlockingQueue<Runnable> workQueue,@b@ ThreadFactory threadFactory) {@b@ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,@b@ threadFactory, defaultHandler);@b@ }@b@ @b@ public ThreadPoolExecutor(int corePoolSize,@b@ int maximumPoolSize,@b@ long keepAliveTime,@b@ TimeUnit unit,@b@ BlockingQueue<Runnable> workQueue,@b@ RejectedExecutionHandler handler) {@b@ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,@b@ Executors.defaultThreadFactory(), handler);@b@ }@b@ @b@ public ThreadPoolExecutor(int corePoolSize,@b@ int maximumPoolSize,@b@ long keepAliveTime,@b@ TimeUnit unit,@b@ BlockingQueue<Runnable> workQueue,@b@ ThreadFactory threadFactory,@b@ RejectedExecutionHandler handler) {@b@ //代码省略@b@ }@b@ ...@b@}
构造函数参数说明
corePoolSize: 核心线程数为 5。@b@maximumPoolSize :最大线程数 10@b@keepAliveTime : 等待时间为 1L。@b@unit: 等待时间的单位为 TimeUnit.SECONDS。@b@workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;@b@handler:饱和策略为 CallerRunsPolicy。
二、代码示例
1、配置new ThreadPoolExecutor.CallerRunsPolicy()当最大线程池配置小于10(并发任务总量)时,直接使用主调用线程
package thread;@b@@b@import java.util.concurrent.ArrayBlockingQueue;@b@import java.util.concurrent.ThreadPoolExecutor;@b@import java.util.concurrent.TimeUnit;@b@@b@public class ThreadPoolExecutorDemo { @b@ @b@ private static volatile long taskTimes=0;@b@ @b@ public static void main(String[] args) { @b@ @b@ ThreadPoolExecutor executor = new ThreadPoolExecutor(@b@ 1, //corePoolSize: 核心线程数@b@ 2, //maximumPoolSize : 最大线程数 @b@ 60, //keepAliveTime : 等待时间为 1L@b@ TimeUnit.SECONDS, //unit: 等待时间的单位为 TimeUnit.SECONDS@b@ // TimeUnit.DAYS; //天@b@ // TimeUnit.HOURS; //小时@b@ // TimeUnit.MINUTES; //分钟@b@ // TimeUnit.SECONDS; //秒@b@ // TimeUnit.MILLISECONDS; //毫秒@b@ // TimeUnit.MICROSECONDS; //微妙@b@ // TimeUnit.NANOSECONDS; //纳秒@b@ new ArrayBlockingQueue<Runnable>(3) //workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100; @b@ // ArrayBlockingQueue // 数组实现的阻塞队列,数组不支持自动扩容。所以当阻塞队列已满@b@ // // 线程池会根据handler参数中指定的拒绝任务的策略决定如何处理后面加入的任务@b@ // @b@ // LinkedBlockingQueue // 链表实现的阻塞队列,默认容量Integer.MAX_VALUE(不限容),@b@ // // 当然也可以通过构造方法限制容量@b@ // @b@ // SynchronousQueue // 零容量的同步阻塞队列,添加任务直到有线程接受该任务才返回@b@ // // 用于实现生产者与消费者的同步,所以被叫做同步队列@b@ // @b@ // PriorityBlockingQueue // 二叉堆实现的优先级阻塞队列@b@ // @b@ // DelayQueue // 延时阻塞队列,该队列中的元素需要实现Delayed接口@b@ // // 底层使用PriorityQueue的二叉堆对Delayed元素排序@b@ // // ScheduledThreadPoolExecutor底层就用了DelayQueue的变体"DelayWorkQueue"@b@ // // 队列中所有的任务都会封装成ScheduledFutureTask对象(该类已实现Delayed接口)@b@ @b@ @b@ ,new ThreadPoolExecutor.CallerRunsPolicy() @b@ // ThreadPoolExecutor.AbortPolicy: 默认,丢弃任务并抛出RejectedExecutionException异常。@b@ // ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。@b@ // ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)@b@ // ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务@b@ );@b@ @b@ @b@@b@@b@ for (int i = 0; i < 10; i++) {@b@ //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) @b@ //执行Runnable@b@ executor.execute(new Runnable() { @b@ @Override@b@ public void run() { @b@ System.out.println(Thread.currentThread().getName()+" executeTask["+(++taskTimes)+"]");@b@ }@b@ });@b@ }@b@ //终止线程池@b@ executor.shutdown();@b@ while (!executor.isTerminated()) {@b@ }@b@ System.out.println("Finished all threads....");@b@ @b@@b@ }@b@@b@}
控制台运行结果
pool-1-thread-1 executeTask[1]@b@main executeTask[1]@b@pool-1-thread-2 executeTask[1]@b@pool-1-thread-1 executeTask[2]@b@main executeTask[3]@b@pool-1-thread-1 executeTask[5]@b@pool-1-thread-2 executeTask[4]@b@pool-1-thread-1 executeTask[6]@b@pool-1-thread-2 executeTask[7]@b@pool-1-thread-1 executeTask[8]@b@Finished all threads....
线程池线程执行过程说明
1、如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 @b@2、如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 @b@3.线程执行完1中的任务后会从队列中取任务。(注意LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池。)@b@@b@如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 @b@如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 @b@@b@处理任务的优先级为: @b@ 核心线程corePoolSize》任务队列workQueue》最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 @b@@b@当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。