侧边栏壁纸
博主头像
憨憨大头个人博客博主等级

心存希冀,目有繁星

  • 累计撰写 110 篇文章
  • 累计创建 13 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

线程池ThreadPoolExecutor(详解)

Administrator
2024-08-03 / 0 评论 / 0 点赞 / 3 阅读 / 75408 字

线程池的标准创建方式

大部分企业的开发规范都会禁止使用快捷线程池(具体原因稍后介绍),要求通过标准构造器ThreadPoolExecutor去构造工作线程池。Executors工厂类中创建线程池的快捷工厂方法实际上是调用ThreadPoolExecutor(定时任务使用ScheduledThreadPoolExecutor)线程池的构造方法完成的。ThreadPoolExecutor构造方法有多个重载版本,其中一个比较重要的构造器如下:

     // 使用标准构造器构造一个普通的线程池
     public ThreadPoolExecutor(
       int corePoolSize,            			// 核心线程数,即使线程空闲(Idle),默认也不会回收
       int maximumPoolSize,                 	// 线程数的上限
       long keepAliveTime, TimeUnit unit,   	// 线程最大空闲(Idle)时长 
       BlockingQueue<Runnable> workQueue, 		// 任务的排队队列
       ThreadFactory threadFactory,             // 新线程的产生方式
       RejectedExecutionHandler handler)    	// 拒绝策略

接下来对这些参数进行具体介绍。

ThreadPoolExecutor线程池工具类

常用开发的工具类代码

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池工具类
 */
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class ThreadPoolUtil {
    /**
     * 线程池核心线程数(当前用到的多线程地方较少 核心数先设置为2)
     */
    private static final int CORE_POOL_SIZE = 2;
 
    /**
     * 线程池最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
 
    /**
     * 任务队列  此处使用ArrayBlockingQueue有界队列,防止队列无限膨胀导致内存溢出
     */
    private static final BlockingQueue<Runnable> WORK_QUEUE = new ArrayBlockingQueue<>(20);
 
    /**
     * 超出核心线程的额外线程空状态生存时间  此处是秒
     */
    private static final int KEEP_ALIVE_TIME = 60;

    /**
     * 线程工厂
     */
    private static final ThreadFactory THREAD_FACTORY = new BasicThreadFactory.Builder()
                .namingPattern("线程名称").build();
 
    /**
     * 拒绝策略 CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
     */
    private static final RejectedExecutionHandler REJECTED_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();
 
    /**
     * 线程池
     */
    private static final ThreadPoolExecutor threadPool;
 
    static {
        threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                WORK_QUEUE,
                THREAD_FACTORY,
                REJECTED_HANDLER);
    }
 
    public static void execute(Runnable runnable) {
        getCommonThreadPoolInfo();
        threadPool.execute(runnable);
    }
 
    public static <T> void execute(FutureTask<T> futureTask) {
        getCommonThreadPoolInfo();
        threadPool.execute(futureTask);
    }
 
    public static <T> void cancel(FutureTask<T> futureTask) {
        getCommonThreadPoolInfo();
        futureTask.cancel(true);
    }

    /**
     * 线程池监控
      */
    private static void getCommonThreadPoolInfo() {
        log.info("CommonThreadPoolInfo========>当前线程总数:{},正在执行任务线程数:{},已执行完成任务数:{}",
                threadPool.getPoolSize(), threadPool.getActiveCount(), threadPool.getCompletedTaskCount());
    }

    /**
     * 自定义线程工厂
     */
    static class MyThreadFactory implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger();

        /**
         * 设置线程名称
         */
        @Override
        public Thread newThread(@NonNull Runnable r) {
            return new Thread(r, "ThreadPoolUtil:" + threadId.getAndIncrement());
        }
    }
}

1.核心和最大线程数量

参数corePoolSize用于设置核心(Core)线程池数量,参数maximumPoolSize用于设置最大线程数量。
线程池执行器将会根据corePoolSize和maximumPoolSize自动维护线程池中的工作线程,大致规则为:

(1)当在线程池接收到新任务,并且当前工作线程数少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求,直到线程数达到corePoolSize。
(2)如果当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程。通过设置corePoolSize和maximumPoolSize相同,可以创建一个固定大小的线程池。
(3)当maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务。
(4)corePoolSize和maximumPoolSize不仅能在线程池构造时设置,也可以调用setCorePoolSize()和setMaximumPoolSize()两个方法进行动态更改。

2.BlockingQueue

BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。

3.keepAliveTime

线程构造器的keepAliveTime(空闲线程存活时间)参数用于设置池内核心线程最大空闲时长,如果超过这个时间,默认情况下核心线程会被回收。如果池在使用过程中提交任务的频率变高,也可以调用方法setKeepAliveTime(long,TimeUnit)进行线程存活时间的动态调整,可以将时长延长。如果需要防止Idle线程被终止,可以将Idle时间设置为无限大,具体如下:

  setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

默认情况下,Idle超时策略仅适用于存在超过corePoolSize线程的情况。但若调用了allowCoreThreadTimeOut(boolean)方法,并且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于非核心线程。

向线程池提交任务的两种方式

方式一:调用execute()方法,例如:
     //Executor 接口中的方法
     void execute(Runnable command);
方式二:调用submit()方法,例如:
     //ExecutorService 接口中的方法
     <T> Future<T> submit(Callable<T> task); 
     <T> Future<T> submit(Runnable task, T result);
     Future<?> submit(Runnable task);

submit()和execute()两类方法的区别:

(1)二者所接收的参数和有无返回结果不一样

Execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数。Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果。
submit()方法也用于启动任务的执行,但是启动之后会返回Future对象,代表一个异步执行实例,可以通过该异步执行实例去获取结果。

(2)submit()方便Exception处理

execute()方法在启动任务执行后,任务执行过程中可能发生的异常调用者并不关心。而通过submit()方法返回的Future对象(异步执行实例),可以进行异步执行过程中的异常捕获。

通过submit()返回的Future对象获取结果

submit()方法自身并不会传递结果,而是返回一个Future异步执行实例,处理过程的结果被包装到Future实例中,调用者可以通过Future.get()方法获取异步执行的结果。演示代码如下:

     // 省略import
     public class CreateThreadPoolDemo
     {
         // 省略其他
         //测试用例:获取异步调用的结果
         @Test
         public void testSubmit2()
         {
         	 // 演示暂用Executors线程工厂
             ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
             Future<Integer> future = pool.submit(new Callable<Integer>()
             {
                 @Override
                 public Integer call() throws Exception
                 {
                     //返回200~300的随机数
                     return RandomUtil.randInRange(200, 300);
                 }
             });
     
             try
             {
                 Integer result = future.get();
                 Print.tco("异步执行的结果是:" + result);
             } catch (InterruptedException e)
             {
                 Print.tco("异步调用被中断");
                 e.printStackTrace();
             } catch (ExecutionException e)
             {
                 Print.tco("异步调用过程中,发生了异常");
                 e.printStackTrace();
             }
             sleepSeconds(10);
             //关闭线程池
             pool.shutdown();
         }
     }

执行的结果如下:

 [main]:异步执行的结果是:220
通过submit()返回的Future对象捕获异常

submit()方法自身并不会传递异常,处理过程中的异常都被包装到Future实例中,调用者在调用Future.get()方法获取执行结果时,可以捕获异步执行过程中抛出的受检异常和运行时异常,并进行对应的业务处理。演示代码如下:

     // 省略import
     public class CreateThreadPoolDemo
     {
         //异步任务的执行目标类
         static class TargetTask implements Runnable
         {
         //为了节约篇幅,省略重复内容
         }
     
         //异步的执行目标类:执行过程中将发生异常
         static class TargetTaskWithError extends TargetTask
         {
             public void run()
             {
                 super.run();
                 throw new RuntimeException("Error from " + taskName);
             }
         }
         //测试用例:提交和执行
         @Test
         public void testSubmit()
         {
             ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
             pool.execute(new TargetTaskWithError());
             /**
              * submit(Runnable x) 返回一个future
              */
             Future future = pool.submit(new TargetTaskWithError());
     
             try
             {
                 //如果异常抛出,就会在调用Future.get()时传递给调用者
                 if (future.get() == null)
                 {
                     //如果Future的返回值为null,那么任务完成
                     Print.tco("任务完成");
                 }
     
             } catch (Exception e)
             {
                 Print.tco(e.getCause().getMessage());
             }
     
             sleepSeconds(10);
             //关闭线程池
             pool.shutdown();
         }
         // 省略其他
     }

执行结果如下:

     [pool-1-thread-2]:任务:task-2 doing
     [pool-1-thread-1]:任务:task-1 doing
     [pool-1-thread-2]:task-2 运行结束.
     [pool-1-thread-1]:task-1 运行结束.
     [main]:Error from task-2

在ThreadPoolExecutor类的实现中,内部核心的任务提交方法是execute()方法,虽然用户程序通过submit()也可以提交任务,但是实际上submit()方法中最终调用的还是execute()方法。

线程池的任务调度流程

线程池的任务调度流程(包含接收新任务和执行下一个任务)大致如下:

(1)如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
(2)如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
(3)当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
(4)在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务。
(5)在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略。

在创建线程池时,如果线程池的参数(如核心线程数量、最大线程数量、BlockingQueue等)配置得不合理,就会出现任务不能被正常调度的问题。

下面是一个错误的线程池配置示例:

     // 省略import
     public class CreateThreadPoolDemo
     {
        @org.junit.Test
         public void testThreadPoolExecutor()
         {
             ThreadPoolExecutor executor = new ThreadPoolExecutor(
                     1, //corePoolSize
                     100, //maximumPoolSize
                     100, //keepAliveTime 空闲保活时长
                     TimeUnit.SECONDS, //空闲保活时长的单位
                     new LinkedBlockingDeque<>(100));//workQueue
             //提交5个任务
             for (int i = 0; i < 5; i++)
             {
                 final int taskIndex = i;
                 executor.execute(() ->
                 {
                     Print.tco("taskIndex = " + taskIndex);
                     try
                     {  //极端测试:无限制睡眠
                         Thread.sleep(Long.MAX_VALUE);
                     } catch (InterruptedException e)
                     {
                         e.printStackTrace();
                     }
                 });
             }
             while (true)
             {  
                 //每隔1秒,输出线程池的工作任务数量、总计的任务数量
                Print.tco("- activeCount:" + executor.getActiveCount()+
                                        " - taskCount:" + executor.getTaskCount());
                sleepSeconds(1);
             }
         }
     
         // 省略其他
     }

运行程序,结果如下:

     [main]:- activeCount:1 - taskCount:5
     [pool-1-thread-1]:taskIndex = 0
     [main]:- activeCount:1 - taskCount:5
     [main]:- activeCount:1 - taskCount:5
     [main]:- activeCount:1 - taskCount:5
     [main]:- activeCount:1 - taskCount:5
     [main]:- activeCount:1 - taskCount:5
     [main]:- activeCount:1 - taskCount:5
     ...

以上示例创建了最大线程数量maximumPoolSize为100的线程池,仅仅向其中提交了5个任务。理论上,这5个任务都会被执行到,奇怪的是示例中只有1个任务在执行,其他的4个任务都在等待。其他任务被加入到了阻塞队列中,需要等pool-1-thread-1线程执行完第一个任务后,才能依次从阻塞队列取出执行。但是,实例中的第一个任务是一个永远也没有办法完成的任务,所以其他的4个任务只能永远在阻塞队列中等待着。由于参数配置得不合理,因此出现了以上的奇怪现象。

为什么会出现上面的奇怪现象呢?因为例子中的corePoolSize为1,阻塞队列的大小为100,按照线程创建的规则,需要等阻塞队列已满,才会去创建新的线程。例子中加入了5个任务,阻塞队列大小为4(<100),所以线程池的调度器不会去创建新的线程,后面的4个任务只能等待。

以上示例的目的是传递两个知识点:

(1)核心和最大线程数量、BlockingQueue队列等参数如果配置得不合理,可能会造成异步任务得不到预期的并发执行,造成严重的排队等待现象。
(2)线程池的调度器创建线程的一条重要的规则是:在corePoolSize已满之后,还需要等阻塞队列已满,才会去创建新的线程。

线程工厂:ThreadFactory

ThreadFactory是Java线程工厂接口,这是一个非常简单的接口,具体如下:

     public interface ThreadFactory {
         //唯一的方法:创建一个新线程
         Thread newThread(Runnable target);
     }

在调用ThreadFactory的唯一方法newThread()创建新线程时,可以更改所创建的新线程的名称、线程组、优先级、守护进程状态等。如果newThread()的返回值为null,表示线程工厂未能成功创建线程,线程池可能无法执行任何任务。

使用Executors创建新的线程池时,也可以基于ThreadFactory(线程工厂)创建,在创建新线程池时可以指定将要使用的ThreadFactory实例。只不过,如果没有指定的话,就会使用Executors.defaultThreadFactory默认实例。使用默认的线程工厂实例所创建的线程全部位于同一个ThreadGroup(线程组)中,具有相同的NORM_PRIORITY(优先级为5),而且都是非守护进程状态。

说明

这里提到了两个工厂类,比较容易混淆。
Executors为线程池工厂类,用于快捷创建线程池(Thread Pool);ThreadFactory为线程工厂类,用于创建线程(Thread)。
基于自定义的ThreadFactory实例创建线程池,首先需要实现一个ThreadFactory接口,实现其唯一的抽象方法newThread(Runnable)。下面的例子首先实现一个简单的线程工厂,然后基于该线程工厂快捷创建线程池,具体的代码如下:

     // 省略import
     public class CreateThreadPoolDemo
     {
         //一个简单的线程工厂
         static public class SimpleThreadFactory implements ThreadFactory
         {
             static AtomicInteger threadNo = new AtomicInteger(1);
             //实现其唯一的创建线程方法
             @Override
             public Thread newThread(Runnable target)
             {
                 String threadName = "simpleThread-" + threadNo.get();
                 Print.tco("创建一个线程,名称为:" + threadName);
                 threadNo.incrementAndGet();
                 //设置线程名称和异步执行目标
                 Thread thread = new Thread(target,threadName);
                 //设置为守护线程
                 // thread.setDaemon(true);
                 return thread;
             }
         }
     
         //线程工厂的测试用例
         @org.junit.Test
         public void testThreadFactory()
         {
             //使用自定义线程工厂快捷创建一个固定大小的线程池
             ExecutorService pool =
                     Executors.newFixedThreadPool(2,new  SimpleThreadFactory());
             for (int i = 0; i < 5; i++)
             {
                 pool.submit(new TargetTask());
             }
             //等待10秒
             sleepSeconds(10);
             Print.tco("关闭线程池");
             pool.shutdown();
         }
         // 省略其他
     }

运行以上代码,其输出如下:

     [main]:创建一条线程,名称为:simpleThread-1
     [main]:创建一条线程,名称为:simpleThread-2
     [simpleThread-1]:任务:task-1 doing
     [simpleThread-2]:任务:task-2 doing
     [simpleThread-1]:task-1 运行结束.
     [simpleThread-1]:任务:task-3 doing
     [simpleThread-2]:task-2 运行结束.
     [simpleThread-2]:任务:task-4 doing
     [simpleThread-2]:task-4 运行结束.
     [simpleThread-1]:task-3 运行结束.
     [simpleThread-2]:任务:task-5 doing
     [simpleThread-2]:task-5 运行结束.
     [main]:关闭线程池

任务阻塞队列

Java中的阻塞队列(BlockingQueue)与普通队列相比有一个重要的特点:在阻塞队列为空时会阻塞当前线程的元素获取操作。具体来说,在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。

BlockingQueue是JUC包的一个超级接口,比较常用的实现类有:

(1)ArrayBlockingQueue:是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列已满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize。

(2)LinkedBlockingQueue:是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)。该队列的吞吐量高于ArrayBlockingQueue。

如果不设置LinkedBlockingQueue的容量(无界队列),当接收的任务数量超出corePoolSize时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。

(3)PriorityBlockingQueue:是具有优先级的无界队列。

(4)DelayQueue:这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,队列头部的元素是过期最快的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。

(5)SynchronousQueue:(同步队列)是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,其吞吐量通常高于LinkedBlockingQueue。快捷工厂方法Executors.newCachedThreadPool所创建的线程池使用此队列。与前面的队列相比,这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新来的任务。

调度器的钩子方法

ThreadPoolExecutor线程池调度器为每个任务执行前后都提供了钩子方法。ThreadPoolExecutor类提供了三个钩子方法(空方法),这三个钩子方法一般用作被子类重写,具体如下:

     //任务执行之前的钩子方法(前钩子)
     protected void beforeExecute(Thread t, Runnable r)   { }
     //任务执行之后的钩子方法(后钩子)
     protected void afterExecute(Runnable r, Throwable t) { }
     //线程池终止时的钩子方法(停止钩子)
     protected void terminated() { }

(1)beforeExecute:异步任务执行之前的钩子方法

线程池工作线程在异步执行目标实例(如Runnable实例)前调用此钩子方法。此方法仍然由执行任务的工作线程调用。默认实现不执行任何操作,但可以在子类中对其进行自定义。此方法由执行目标实例的工作线程调用,可用于重新初始化ThreadLocal线程本地变量实例、更新日志记录、开始计时统计、更新上下文变量等。

(2)afterExecute:异步任务执行之后的钩子方法

线程池工作线程在异步执行目标实例后调用此钩子方法。此方法仍然由执行任务的工作线程调用。此钩子方法的默认实现不执行任何操作,可以在调度器子类中对其进行自定义。此方法由执行目标实例的工作线程调用,可用于清除ThreadLocal线程本地变量、更新日志记录、收集统计信息、更新上下文变量等。

(3)terminated:线程池终止时的钩子方法

terminated钩子方法在Executor终止时调用,默认实现不执行任何操作。

说明

beforeExecute和afterExecute两个方法在每个任务执行前后被调用,如果钩子(回调方法)引发异常,内部工作线程可能失败并突然终止。

为线程池定制钩子方法的示例,具体代码如下:

     // 省略import
     public class CreateThreadPoolDemo
     {

		//线程本地变量,用于记录线程异步任务的开始执行时间
	    private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
	    
         @org.junit.Test
         public void testHooks()
         {
             ExecutorService pool = new ThreadPoolExecutor(2, //coreSize
                         4, //最大线程数
                        60,//空闲保活时长
                     TimeUnit.SECONDS, 
                        new LinkedBlockingQueue<>(2)) //等待队列
             {
              //继承:调度器终止钩子
                 @Override
                 protected void terminated() 
                 {
                     Print.tco("调度器已经终止!");
                 }
     
                //继承:执行前钩子
                @Override
                 protected void beforeExecute(Thread t, Runnable target)
                 {
                     Print.tco( target +"前钩被执行");
                     //记录开始执行时间
                     START_TIME.set(System.currentTimeMillis());
                     super.beforeExecute(t, target);
                 }
     
                //继承:执行后钩子
                 @Override
                 protected void afterExecute(Runnable target, Throwable t)
                 {
                     super.afterExecute(target, t);
                     //计算执行时长
                     long time = (System.currentTimeMillis() - START_TIME.get()) ;
                     Print.tco( target + " 后钩被执行, 任务执行时长(ms):" + time);
                     //清空本地变量
                     START_TIME.remove();
                 }
             };
     
             for (int i = 1; i <= 5; i++)
             {
                 pool.execute(new TargetTask());
             }
             //等待10秒
             sleepSeconds(10);
             Print.tco("关闭线程池");
             pool.shutdown();
         }
         // 省略其他
     }

示例代码在beforeExecute(前钩子)方法中通过startTime线程局部变量暂存了异步目标任务(如Runnable实例)的开始执行时间(起始时间),在afterExecute(后钩子)方法中通过startTime线程局部变量获取了之前暂存的起始时间,然后计算与系统当前时间(结束时间)之间的时间差,从而得出异步目标任务的执行时长。

线程池的拒绝策略

在线程池的任务缓存队列为有界队列(有容量限制的队列)的时候,如果队列满了,提交任务到线程池的时候就会被拒绝。
总体来说,任务被拒绝有两种情况:

(1)线程池已经被关闭。
(2)工作队列已满且maximumPoolSize已满。

无论以上哪种情况任务被拒绝,线程池都会调用RejectedExecutionHandler实例的rejectedExecution方法。RejectedExecutionHandler是拒绝策略的接口,JUC为该接口提供了以下几种实现:

·AbortPolicy:拒绝策略。
·DiscardPolicy:抛弃策略。
·DiscardOldestPolicy:抛弃最老任务策略。
·CallerRunsPolicy:调用者执行策略。
·自定义策略。

(1)AbortPolicy
使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略。
(2)DiscardPolicy
该策略是AbortPolicy的Silent(安静)版本,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。
(3)DiscardOldestPolicy
抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。
(4)CallerRunsPolicy
调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。

在以上4种内置策略中,线程池默认的拒绝策略为AbortPolicy,如果提交的任务被拒绝,线程池就会抛出RejectedExecutionException异常,该异常是非受检异常(运行时异常),很容易忘记捕获。如果关心任务被拒绝的事件,需要在提交任务时捕获RejectedExecutionException异常。

(5)自定义策略
如果以上拒绝策略都不符合需求,那么可自定义一个拒绝策略,实现RejectedExecutionHandler接口的rejectedExecution方法即可。

下面给出一个自定义拒绝策略的例子,代码如下:

 	//自定义拒绝策略
    public static class CustomIgnorePolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 可做日志记录等
            Print.tco(r + " rejected; " + " - getTaskCount: " + e.getTaskCount());
        }
    }

    @org.junit.Test
    public void testCustomIgnorePolicy() {
        int corePoolSize = 2; //核心线程数
        int maximumPoolSize = 4;  //最大线程数
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        //最大排队任务数
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        //线程工厂(省略类,同上)
        ThreadFactory threadFactory = new SimpleThreadFactory();
        //拒绝和异常策略
        RejectedExecutionHandler policy = new CustomIgnorePolicy();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime, unit,
                workQueue,
                threadFactory,
                policy);

        // 预启动所有核心线程
        pool.prestartAllCoreThreads();
        for (int i = 1; i <= 10; i++) {
            pool.execute(new TargetTask());
        }
        //等待10秒
        sleepSeconds(10);
        Print.tco("关闭线程池");
        pool.shutdown();
    }

运行以上代码,大致结果如下:

     [main]:创建一个线程,名称为:simpleThread-1
     [main]:创建一个线程,名称为:simpleThread-2
     [main]:创建一个线程,名称为:simpleThread-3
     [simpleThread-1]:任务:task-1 doing
     [simpleThread-2]:任务:task-2 doing
     [main]:创建一个线程,名称为:simpleThread-4
     [simpleThread-3]:任务:task-3 doing
     [simpleThread-4]:任务:task-6 doing
     [main]:TargetTask{task-7} rejected;  - getTaskCount: 6
     [main]:TargetTask{task-8} rejected;  - getTaskCount: 6
     [main]:TargetTask{task-9} rejected;  - getTaskCount: 6
     [main]:TargetTask{task-10} rejected;  - getTaskCount: 6
     [simpleThread-1]:task-1 运行结束.
     [simpleThread-2]:task-2 运行结束.
     [simpleThread-1]:任务:task-4 doing
     [simpleThread-2]:任务:task-5 doing
     [simpleThread-2]:task-5 运行结束.
     [simpleThread-4]:task-6 运行结束.
     [simpleThread-3]:task-3 运行结束.
     [simpleThread-1]:task-4 运行结束.
     [main]:关闭线程池

线程池的优雅关闭

一般情况下,线程池启动后建议手动关闭。在介绍线程池的优雅关闭之前,我们先了解一下线程池的状态。线程池总共存在5种状态,定义在ThreadPoolExecutor类中,具体代码如下:

 // 省略import
 public class ThreadPoolExecutor extends AbstractExecutorService {
      // runState is stored in the high-order bits
     private static final int RUNNING   		= -1 << COUNT_BITS;
     private static final int SHUTDOWN          =  0 << COUNT_BITS;
     private static final int STOP              =  1 << COUNT_BITS;
     private static final int TIDYING           =  2 << COUNT_BITS;
     private static final int TERMINATED        =  3 << COUNT_BITS;
     // 省略其他
 }

线程池的5种状态具体如下:

(1)RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务。
(2)SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕。
(3)STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程。
(4)TIDYING:该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法。
(5)TERMINATED:执行完terminated()钩子方法之后的状态。

线程池的状态转换规则如下图所示:

优雅地关闭线程池主要涉及的方法有3个:
(1)shutdown:是JUC提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务。
(2)shutdownNow:是JUC提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务。
(3)awaitTermination:等待线程池完成关闭。在调用线程池的shutdown()与shutdownNow()方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()方法。

1.shutdown()方法的原理

shutdown()方法的源码大致如下:

    public void shutdown()
     {
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try
         {
             // 检查权限
             checkShutdownAccess();
             // 设置线程池状态
             advanceRunState(SHUTDOWN);
             // 中断空闲线程
             interruptIdleWorkers();
             // 钩子函数,主要用于清理一些资源
             onShutdown();
         } finally
         {
             mainLock.unlock();
         }
         tryTerminate();
     }

Shutdown()方法首先加锁,其次检查调用者是否用于执行线程池关闭的Java Security权限。接着shutdown()方法会将线程池状态变为SHUTDOWN,在这之后线程池不再接受提交的新任务。此时如果还继续往线程池提交任务,将会使用线程池拒绝策略响应,默认的拒绝策略将会使用ThreadPoolExecutor.AbortPolicy,接收新任务时会抛出RejectedExecutionException异常。

2.shutdownNow()方法的原理

shutdownNow()方法的源码大致如下:

   public List<Runnable> shutdownNow()
   {
         List<Runnable> tasks;
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try
         {
             // 检查状态
             checkShutdownAccess();
             // 将线程池状态变为 STOP
             advanceRunState(STOP);
             // 中断所有线程,包括工作线程以及空闲线程
             interruptWorkers();
             // 丢弃工作队列中的剩余任务
             tasks = drainQueue();
         } finally
         {
             mainLock.unlock();
         }
         tryTerminate();
         return tasks;
   }

shutdownNow()方法将会把线程池状态设置为STOP,然后中断所有线程(包括工作线程以及空闲线程),最后清空工作队列,取出工作队列所有未完成的任务返回给调用者。与有序的shutdown()方法相比,shutdownNow()方法比较粗暴,直接中断工作线程。不过这里需要注意的是,中断线程并不代表线程立刻结束,只是通过工作线程的interrupt()实例方法设置了中断状态,这里需要用户程序主动配合线程进行中断操作。

3.awaitTermination()方法的使用

调用了线程池的shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成,如果需要等待线程池关闭完成,需要调用awaitTermination()进行主动等待。调用方法大致如下:

 threadPool.shutdown();
 try {
     //一直等待,直到线程池完成关闭
     while (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
         System.out.println("线程池任务还未执行结束");
     }
 } catch (InterruptedException e) {
     e.printStackTrace();
 }

如果线程池完成关闭,awaitTermination()方法将会返回true,否则当等待时间超过指定时间后将会返回false。如果需要调用awaitTermination(),建议不是永久等待,而是设置一定重试次数。

下面的代码参考了阿里巴巴著名的分布式框架Dubbo框架中线程池关闭源码中的部分代码:

       if(!threadPool.isTerminated())
     {
         try
         {
             for (int i = 0; i < 1000; i++) //循环关闭1000次,每次等待10毫秒
             {
                 if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS))
                 {
                     break;
                 }
                 threadPool.shutdownNow();
             }
         } catch (InterruptedException e)
         {
             System.err.println(e.getMessage());
         } catch (Throwable e)
         {
             System.err.println(e.getMessage());
         }
     }

4.优雅地关闭线程池

大家可以结合shutdown()、shutdownNow()、awaitTermination()三个方法优雅地关闭一个线程池,大致分为以下几步:

(1)执行shutdown()方法,拒绝新任务的提交,并等待所有任务有序地执行完毕。
(2)执行awaitTermination(long timeout,TimeUnit unit)方法,指定超时时间,判断是否已经关闭所有任务,线程池关闭完成。
(3)如果awaitTermination()方法返回false,或者被中断,就调用shutDownNow()方法立即关闭线程池所有任务。
(4)补充执行awaitTermination(long timeout,TimeUnit unit)方法,判断线程池是否关闭完成。如果超时,就可以进入循环关闭,循环一定的次数(如1000次),不断关闭线程池,直到其关闭或者循环结束。

优雅地关闭线程池的参考代码如下:

 // 省略import
 public class ThreadUtil
 {
    public static void shutdownThreadPoolGracefully(ExecutorService threadPool)
     {
         // 若已经关闭则返回
         if (!(threadPool instanceof ExecutorService) || threadPool.isTerminated())
         {
             return;
         }
         try
         {
             threadPool.shutdown();   //拒绝接受新任务
         } catch (SecurityException e)
         {
             return;
         } catch (NullPointerException e)
         {
             return;
         }
         try
         {
             // 等待60秒,等待线程池中的任务完成执行
             if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
             {
                 // 调用 shutdownNow() 方法取消正在执行的任务
                 threadPool.shutdownNow();
                 // 再次等待60秒,如果还未结束,可以再次尝试,或者直接放弃
                 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
                 {
                     System.err.println("线程池任务未正常执行结束");
                 }
             }
         } catch (InterruptedException ie)
         {
             // 捕获异常,重新调用 shutdownNow() 方法
             threadPool.shutdownNow();
         }
         // 仍然没有关闭,循环关闭1000次,每次等待10毫秒
         if (!threadPool.isTerminated())
         {
             try
             {
                 for (int i = 0; i < 1000; i++)
                 {
                     if (threadPool.awaitTermination(10,TimeUnit.MILLISECONDS))
                     {
                         break;
                     }
                     threadPool.shutdownNow();
                 }
             } catch (InterruptedException e)
             {
                 System.err.println(e.getMessage());
             } catch (Throwable e)
             {
                 System.err.println(e.getMessage());
             }
         }
     }
     // 省略不相干代码
 }

5.注册JVM钩子函数自动关闭线程池

如果使用了线程池,可以在JVM中注册一个钩子函数,在JVM进程关闭之前,由钩子函数自动将线程池优雅地关闭,以确保资源正常释放。

下面的例子使用JVM钩子函数关闭了一个定义在ThreadUtil辅助类中用于执行定时、顺序任务的线程池,具体代码如下:

     // 省略import
     public class ThreadUtil
     {
     
         //懒汉式单例创建线程池:用于执行定时、顺序任务
         static class SeqOrScheduledTargetThreadPoolLazyHolder
         {
             //线程池:用于定时任务、顺序排队执行任务
             static final ScheduledThreadPoolExecutor EXECUTOR =
                             new ScheduledThreadPoolExecutor( 1,
                             new CustomThreadFactory("seq"));
     
             static
             {
                 //注册JVM关闭时的钩子函数
                 Runtime.getRuntime().addShutdownHook(
                                      new ShutdownHookThread("定时和顺序任务线程池",
                                      new Callable<Void>()
                         {
                             @Override
                             public Void call() throws Exception
                             {
                                 //优雅地关闭线程池
                                 shutdownThreadPoolGracefully(EXECUTOR);
                                 return null;
                             }
                         }));
             }
         }
         // 省略不相干代码
     }

Executors快捷创建线程池的潜在问题

1.使用Executors创建“固定数量的线程池”的潜在问题

使用newFixedThreadPool工厂方法创建“固定数量的线程池”的源码如下:

     public static ExecutorService newFixedThreadPool(int nThreads)
     {
         return new ThreadPoolExecutor(
                 nThreads,                                       // 核心线程数
                 nThreads,                                       // 最大线程数
                 0L,                                             // 线程最大空闲(Idle)时长
                 TimeUnit.MILLISECONDS,         				// 时间单位:毫秒
                 new LinkedBlockingQueue<Runnable>()      		//任务的排队队列,无界队列
         );
     }

newFixedThreadPool工厂方法返回一个ThreadPoolExecutor实例,该线程池实例的corePoolSize数量为参数nThread,其maximumPoolSize数量也为参数nThread,其workQueue属性的值为LinkedBlockingQueue()无界阻塞队列。

使用Executors创建“固定数量的线程池”的潜在问题主要存在于其workQueue上,其值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列中大量的任务等待。如果队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽。

2.使用Executors创建“单线程化线程池”的潜在问题

使用newSingleThreadExecutor工厂方法创建“单线程化线程池”的源码如下:

     public static ExecutorService newSingleThreadExecutor()
     {
         return new FinalizableDelegatedExecutorService
                 (new ThreadPoolExecutor(
                         1,                                        // 核心线程数
                         1,                                         // 最大线程数
                         0L,                                       // 线程最大空闲(Idle)时长
                         TimeUnit.MILLISECONDS,        			   //时间单位:毫秒
                         new LinkedBlockingQueue<Runnable>()      //无界队列
                 ));
     }

以上代码首先通过调用工厂方法newFixedThreadPool(1)创建一个数量为1的“固定大小的线程池”,然后使用FinalizableDelegatedExecutorService对该“固定大小的线程池”进行包装,这一层包装的作用是防止线程池的corePoolSize被动态地修改。

使用Executors创建的“单线程化线程池”与“固定大小的线程池”一样,其潜在问题仍然存在于其workQueue属性上,该属性的值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列大量阻塞。如果队列很大,很有可能导致JVM的OOM异常,甚至造成内存资源耗尽。

3.使用Executors创建“可缓存线程池”的潜在问题

使用newCachedThreadPool工厂方法创建“可缓存线程池”的源码如下:

     public static ExecutorService newCachedThreadPool()
     {
         return new ThreadPoolExecutor(
                 0,                                             // 核心线程数
                 Integer.MAX_VALUE,                             // 最大线程数
                 60L,                                           // 线程最大空闲(Idle)时长
                 TimeUnit.MILLISECONDS,                         // 时间单位:毫秒
                 new SynchronousQueue<Runnable>() 				// 任务的排队队列,无界队列
         );
     }

以上代码通过调用ThreadPoolExecutor标准构造器创建一个核心线程数为0、最大线程数不设限制的线程池。所以,理论上“可缓存线程池”可以拥有无数个工作线程,即线程数量几乎无限制。“可缓存线程池”的workQueue为SynchronousQueue同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,正因为“可缓存线程池”可以无限制地创建线程,不会有任务等待,所以才使用SynchronousQueue。当“可缓存线程池”有新任务到来时,新任务会被插入SynchronousQueue实例中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可用线程则执行,若没有可用线程,则线程池会创建一个线程来执行该任务。

使用Executors创建的“可缓存线程池”的潜在问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽。

4.使用Executors创建“可调度线程池”的潜在问题

使用newScheduledThreadPool工厂方法创建“可调度线程池”的源码如下:

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
     {
         return new ScheduledThreadPoolExecutor(corePoolSize);
     }

Executors的newScheduledThreadPool工厂方法调用了ScheduledThreadPoolExecutor实现类的构造器,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的普通线程池类,在其构造器内部进一步调用了该父类的构造器,具体的代码如下:

   public ScheduledThreadPoolExecutor(int corePoolSize)
     {
         super(corePoolSize,                    // 核心线程数
                 Integer.MAX_VALUE,     		// 最大线程数
                 0,                             // 线程最大空闲(Idle)时长
                 NANOSECONDS,					//时间单位
                 new DelayedWorkQueue()  		//任务的排队队列
         );
     }

以上代码创建了一个ThreadPoolExecutor实例,其corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE,表示线程数不设上限,其workQueue为一个DelayedWorkQueue实例,这是一个按到期时间升序排序的阻塞队列。使用Executors创建的“可缓存线程池”的潜在问题存在于其最大线程数量不设限上。由于其线程数量不设限,如果到期任务太多,就会导致CPU的线程资源耗尽。

总结

(1)FixedThreadPool和SingleThreadPool这两个工厂方法所创建的线程池,工作队列(任务排队的队列)的长度都为Integer.MAX_VALUE,可能会堆积大量的任务,从而导致OOM(即耗尽内存资源)。
(2)CachedThreadPool和ScheduledThreadPool这两个工厂方法所创建的线程池允许创建的线程数量为Integer.MAX_VALUE,可能会导致创建大量的线程,从而导致OOM。

虽然Executors工厂类提供了构造线程池的便捷方法,但是对于服务器程序而言,大家应该杜绝使用这些便捷方法,而是直接使用线程池ThreadPoolExecutor的构造器,从而有效避免由于使用无界队列可能导致的内存资源耗尽,或者由于对线程个数不做限制而导致的CPU资源耗尽等问题。所以,大厂的编程规范都不允许使用Executors创建线程池,而是要求使用标准构造器ThreadPoolExecutor创建线程池。

确定线程池的线程数

(一)线程池分类
使用线程池的好处:

(1)降低资源消耗:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,通过重复利用已创建的线程可以降低线程创建和销毁造成的消耗。
(2)提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行。
(3)提高线程的可管理性:线程池提供了一种限制、管理资源的策略,维护一些基本的线程统计信息,如已完成任务的数量等。通过线程池可以对线程资源进行统一的分配、监控和调优。虽然使用线程池的好处很多,但是如果其线程数配置得不合理,不仅可能达不到预期效果,反而可能降低应用的性能。

按照任务类型对线程池进行分类

(1)IO密集型任务此类任务主要是执行IO操作。由于执行IO操作的时间较长,导致CPU的利用率不高,这类任务CPU常处于空闲状态。Netty的IO读写操作为此类任务的典型例子。
(2)CPU密集型任务此类任务主要是执行计算任务。由于响应时间很快,CPU一直在运行,这种任务CPU的利用率很高。
(3)混合型任务此类任务既要执行逻辑计算,又要进行IO操作(如RPC调用、数据库访问)。相对来说,由于执行IO操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的CPU利用率也不是太高。Web服务器的HTTP请求处理操作为此类任务的典型例子。

一般情况下,针对以上不同类型的异步任务需要创建不同类型的线程池,并进行针对性的参数配置。

(二)IO密集型
由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用其他线程继续使用CPU,以提高CPU的使用率。

ThreadUtil类中为IO密集型任务创建了一个简单的参考线程池,具体代码如下:

 // 省略import
 public class ThreadUtil
 {
     //CPU核数
     private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
     //IO处理线程数
     private static final int IO_MAX = Math.max(2, CPU_COUNT * 2);
     /**
      * 空闲保活时限,单位秒
      */
     private static final int KEEP_ALIVE_SECONDS = 30;
     /**
      * 有界队列size
      */
     private static final int QUEUE_SIZE = 128;
     //懒汉式单例创建线程池:用于IO密集型任务
     private static class IoIntenseTargetThreadPoolLazyHolder
     {
         //线程池: 用于IO密集型任务
         private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
                 IO_MAX,  //CPU核数*2
                 IO_MAX,  //CPU核数*2
                 KEEP_ALIVE_SECONDS,
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue(QUEUE_SIZE),
                 new CustomThreadFactory("io"));
 
         static
         {
             EXECUTOR.allowCoreThreadTimeOut(true);
             //JVM关闭时的钩子函数
             Runtime.getRuntime().addShutdownHook(
                     new ShutdownHookThread("IO密集型任务线程池",
                      new Callable<Void>()
                     {
                         @Override
                         public Void call() throws Exception
                         {
                             //优雅地关闭线程池
                             shutdownThreadPoolGracefully(EXECUTOR);
                             return null;
                         }
                     }));
         }
     }
     // 省略不相干代码
 }

在以上参考代码中,有以下几个要点需要特别说明:

(1)为参考的IO线程池调用了allowCoreThreadTimeOut(…)方法,并且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于核心线程,当池中的线程长时间空闲时,可以自行销毁。
(2)使用有界队列缓冲任务而不是无界队列,如果128太小,可以根据具体需要进行增大,但是不能使用无界队列。
(3)corePoolSize和maximumPoolSize保持一致,使得在接收到新任务时,如果没有空闲工作线程,就优先创建新的线程去执行新任务,而不是优先加入阻塞队列,等待现有工作线程空闲后再执行。
(4)使用懒汉式单例模式创建线程池,如果代码没有用到此线程池,就不会立即创建。
(5)使用JVM关闭时的钩子函数优雅地自动关闭线程池

(三)CPU密集型

CPU密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗CPU资源,比如计算圆周率、对视频进行高清解码等。CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要最高效地利用CPU,CPU密集型任务并行执行的数量应当等于CPU的核心数。

比如4个核心的CPU,通过4个线程并行地执行4个CPU密集型任务,此时的效率是最高的。但是如果线程数远远超出CPU核心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。因此,对于CPU密集型的任务来说,线程数等于CPU数就行。

ThreadUtil类中为CPU密集型任务创建了一个简单的参考线程池,具体代码如下:

 // 省略import
 public class ThreadUtil
 {
     //CPU核数
     private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
 
     private static final int MAXIMUM_POOL_SIZE = CPU_COUNT;
 
     //懒汉式单例创建线程池:用于CPU密集型任务
     private static class CpuIntenseTargetThreadPoolLazyHolder
     {
         //线程池:用于CPU密集型任务
         private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
                 MAXIMUM_POOL_SIZE,
                 MAXIMUM_POOL_SIZE,
                 KEEP_ALIVE_SECONDS,
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue(QUEUE_SIZE),
                 new CustomThreadFactory("cpu"));
 
         static
         {
             EXECUTOR.allowCoreThreadTimeOut(true);
             //JVM关闭时的钩子函数
             Runtime.getRuntime().addShutdownHook(
                     new ShutdownHookThread("CPU密集型任务线程池",
                        new Callable<Void>()
                     {
                         @Override
                         public Void call() throws Exception
                         {
                             //优雅地关闭线程池
                             shutdownThreadPoolGracefully(EXECUTOR);
                             return null;
                         }
                     }));
         }
     }
     // 省略不相干代码
 }

(四)混合型

混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作(如RPC调用、数据库访问、网络通信等),所以混合型任务CPU的利用率不是太高,非CPU耗时往往是CPU耗时的数倍。比如在Web应用中处理HTTP请求时,一次请求处理会包括DB操作、RPC操作、缓存操作等多种耗时操作。一般来说,一次Web请求的CPU计算耗时往往较少,大致在100~500毫秒,而其他耗时操作会占用500~1000毫秒,甚至更多的时间。

在为混合型任务创建线程池时,如何确定线程数呢?业界有一个比较成熟的估算公式,具体如下:

最佳线程数目 = (线程等待时间 / 线程CPU时间之比 + 1) * CPU核数

通过公式可以看出:等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。

以上公式的估算结果仅仅是理论最佳值,在生产环境中的使用也仅供参考。生产环境需要结合系统网络环境和硬件情况(CPU、内存、硬盘读写速度)不断尝试,获取一个符合实际的线程数值。

ThreadUtil类中为混合型任务创建了一个简单的参考线程池,具体代码如下:

 // 省略import
 public class ThreadUtil
 {
     private static final int MIXED_MAX = 128;  //最大线程数
     private static final String MIXED_THREAD_AMOUNT = "mixed.thread.amount";
 
     //懒汉式单例创建线程池:用于混合型任务
     private static class MixedTargetThreadPoolLazyHolder
     {
         //首先从环境变量 mixed.thread.amount 中获取预先配置的线程数
         //如果没有对 mixed.thread.amount进行配置,就使用常量 MIXED_MAX作为线程数
         private static final int max =  (null != System.getProperty(MIXED_THREAD_AMOUNT)) ?
                 Integer.parseInt(System.getProperty(MIXED_THREAD_AMOUNT)) : MIXED_MAX;
         //线程池:用于混合型任务
         private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
                 max,
                 max,
                 KEEP_ALIVE_SECONDS,
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue(QUEUE_SIZE),
                 new CustomThreadFactory("mixed"));
 
         static
         {
             EXECUTOR.allowCoreThreadTimeOut(true);
             //JVM关闭时的钩子函数
             Runtime.getRuntime().addShutdownHook(
                 new ShutdownHookThread("混合型任务线程池", new Callable<Void>()
             {
                 @Override
                 public Void call() throws Exception
                 {
                     //优雅地关闭线程池
                     shutdownThreadPoolGracefully(EXECUTOR);
                     return null;
                 }
             }));
         }
     }
     // 省略不相干代码
 }
0

评论区