线程池】ThreadPoolExector的原理分析

2021-03-30 19:29发布

3条回答
studentaaa
2楼 · 2021-04-04 09:15

ThreadPoolExecutor的内部工作原理,整体思路总结为5句话:

  1. 如果线程池大小poolSize小于corePoolSize,则创建新线程执行任务。

  2. 如果线程池大小poolSize大于corePoolSize,且等待队列未满,则进入等待队列。

  3. 如果线程池大小poolSize大于corePoolSize且小于maximumPoolSize,且等待队列已满,则创建新线程执行任务。

  4. 如果线程池大小poolSize大于corePoolSize且大于maximumPoolSize,且等待队列已满,则调用拒绝策略来处理该任务。

  5. 线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在keepAliveTime里等不到新的任务了,那么线程就会退出。

ThreadPoolExecutor线程池中拒绝策略:

  1. AbortPolicy:为java线程池默认的阻塞策略,不执行此任务,而且会直接抛出一个执行时异常,切记TreadPoolExecutor.execute需要try catch,否则程序会直接退出。

  2. DiscardPolicy:直接抛弃,任务不执行,空方法

  3. DiscardOldestPolicy:从队列里面抛弃head的一个任务,并再次execute 此任务(task)

  4. CallerRunsPolicy:在调用execute的线程里面执行此command,会阻塞入口。

  5. 用户自定义拒绝策略:实现RejectdExecutionHandler,并自己定义策略模式。

我是大脸猫
3楼 · 2021-04-05 21:01

常见大厂面试题型:

线程池使用过吗?谈谈对 ThreadPoolExector 的理解?

为什使用线程池,线程池的优势?

创建线程的几种方式?

线程池如何使用?

线程池的几个重要参数介绍?

说说线程池的底层工作原理?

线程池的拒绝策略你谈谈?

你在工作中单一的、固定数的和可变的三种创建线程池的方法,你用哪个多?

你在工作中是如何使用线程池的,是否自定义过线程池使用?

合理配置线程池你是如果考虑的?

带着问题学习,我相信能理解的更深刻,掌握的更多。


 


 一、线程池使用过吗?谈谈对 ThreadPoolExector 的理解?

根据自己学习情况回答。


 


二、为什使用线程池,线程池的优势?

线程池用于多线程处理中,它可以根据系统的情况,可以有效控制线程执行的数量,优化运行效果。线程池做的工作主要是:控制运行的线程的数量,处理过程中将多余的任务放入阻塞队列,然后创建线程启动这些任务,如果线程数量超过了最大数量,那么超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。


主要特点为:


线程复用

控制最大并发数量

管理线程

主要优点:


降低资源消耗,通过重复利用已创建的线程来降低线程创建和销毁造成的消耗。

提高相应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行。

提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅仅会消耗系统资源,还会降低体统的稳定性,使用线程可以进行统一分配,调优和监控。

 


三、创建线程的几种方式

继承 Thread

实现 Runnable 接口

实现 Callable

举例:Thread 、Runnable就不举例了,下面是通过Callable接口实现:


public class CallableDemo {

 

public static void main(String[] args) throws ExecutionException, InterruptedException {

 

    // 在 FutureTask 中传入 Callable 的实现类

    FutureTask futureTask = new FutureTask<>(new Callable() {

        @Override

        public Integer call() throws Exception {

            return 12;   

        }

    });

 

    // 把 futureTask 放入线程中

    new Thread(futureTask).start();

 

    // 获取结果

    Integer res = futureTask.get();

    System.out.println(res);

 

    }

 

}

 


四、线程池如何使用?

1、架构说明:




 


 


2、编码实现:


常用3种:


Executors.newSingleThreadExecutor():只有一个线程的线程池,因此所有提交的任务是顺序执行,适用于一个一个任务执行的场景

public static ExecutorService newSingleThreadExecutor() { 

    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L,         

         TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); 

}

 


Executors.newCachedThreadPool():线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行,如果线程超过60秒内没执行,那么将被终止并从池中删除,适用执行很多短期异步的小程序或者负载较轻的服务

public static ExecutorService newCachedThreadPool() { 

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, 

        TimeUnit.SECONDS, new SynchronousQueue()); 

}

 


Executors.newFixedThreadPool():拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待,适用执行长期的任务,性能好很多。

public static ExecutorService newFixedThreadPool(int nThreads) { 

    return new ThreadPoolExecutor(nThreads, nThreads, 0L, 

        TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); 

}

 


不常用:


Executors.newScheduledThreadPool():用来调度即将执行的任务的线程池

 

Executors.newWorkStealingPool(): newWorkStealingPool适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中

 


可以发现每种方式的底层最终还是通过ThreadPoolExecutor来实现线程池


ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务。


 


五、线程池的几个重要参数介绍?

ThreadPoolExecutor底层源码,带有7个参数:


public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler) {

        if (corePoolSize < 0>

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize>

            keepAliveTime < 0>

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.acc = System.getSecurityManager() == null ?

                null :

                AccessController.getContext();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

核心参数:前5个


参数


作用


corePoolSize


核心线程池大小


maximumPoolSize


最大线程池大小


keepAliveTime


线程池中超过 corePoolSize 数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true) 使得核心线程有效时间


TimeUnit


keepAliveTime 时间单位


workQueue


阻塞任务队列


threadFactory


新建线程工厂


RejectedExecutionHandler


拒绝策略。当提交任务数超过 maxmumPoolSize+workQueue 之和时,任务会交给RejectedExecutionHandler 来处理


 


六、说说线程池的底层工作原理?



 


重点讲解:


 其中比较容易让人误解的是:参数corePoolSize,maximumPoolSize,workQueue之间关系。


当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。

当线程池达到corePoolSize时,新提交任务将被放入 workQueue 中,等待线程池中任务调度执行。

当workQueue已满,且 maximumPoolSize 大于 corePoolSize 时,新提交任务会创建新线程执行任务。

当提交任务数超过 maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理。

当线程池中超过corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程 。

当设置allowCoreThreadTimeOut(true) 时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭。

 


推荐看juc底层视频学习一下:本人学习教程,尚硅谷周阳线程池详解


 


 


七、线程池的拒绝策略你谈谈?

1、什么是拒绝策略:


等待队列已经满了,再也塞不下新的任务,同时线程池中的线程数达到了最大线程数max,无法继续为新任务服务。此时,我们需要拒绝策略机制合理的处理这个问题。也就是七大参数之一:RejectedExecutionHandler


 


2、拒绝策略有哪些:


AbortPolicy(默认):处理程序遭到拒绝将抛出运行时 RejectedExecutionException,阻止系统的正常运行。

 


CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度,既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低任务的流量。

DiscardPolicy:不能执行的任务将被删除,直接丢弃任务

DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。抛弃队列中等待最久的任务

 


 


八、你在工作中单一的、固定数的和可变的三种创建线程池的方法,你用哪个多,超级大坑?

聪明的人的回答应该不是这三者中的任何一个,而是自己创建线程池。如果读者对Java中的阻塞队列有所了解的话,看到这里或许就能够明白原因了。


Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。


ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。


 LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。


这里的问题就出在:


不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题OOM。


 


 


 


 


九、你在工作中是如何使用线程池的,是否自定义过线程池使用?

自定义线程池,以及api方式举例:


package com.juc.threadpool;

 

import java.util.concurrent.*;

 

/**

 * @program: acmPractice

 * @description: 线程池应用

 * @author: Mr.Li

 * @create: 2019-10-09 20:33

 **/

public class MyThreadPoolDemo {

    public static void main(String[] args) {

        //一池5个线程处理

        //ExecutorService threadPool = Executors.newFixedThreadPool(5);

 

        //一池一线程

        //ExecutorService threadPool = Executors.newSingleThreadExecutor();

 

        //一池n线程

        //ExecutorService threadPool = Executors.newCachedThreadPool();

 

        //自定义线程池

        ExecutorService threadPool = new ThreadPoolExecutor(2,5,1L,

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<>(3),

                Executors.defaultThreadFactory(),

                new ThreadPoolExecutor.AbortPolicy());  

               //4种拒绝策略.AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy

               //不同的策略方式,运行结果也将不一致

 

        //模拟10个用户来办理业务,每个用户就是一个来自外部的请求线程

        for(int i = 1; i <= 10; i++) {

            threadPool.execute(()->{

                System.out.println(Thread.currentThread().getName() + "办理业务");

            });

        }

 

    }

}

 


 


十、合理配置线程池你是如果考虑的?

1、CPU 密集型


CPU 密集的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行。

CPU 密集型任务尽可能的少的线程数量,一般为 CPU 核数 + 1 个线程的线程池。

2、IO 密集型


由于 IO 密集型任务线程并不是一直在执行任务,可以多分配一点线程数,如 CPU * 2

也可以使用公式:CPU 核数 / (1 - 阻塞系数);其中阻塞系数在 0.8 ~ 0.9 之间。

 


 


扩展

十一、死锁编码以及定位分析

产生死锁的原因:


死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种相互等待的现象,如果无外力的干涉那它们都将无法推进下去,如果系统的资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。



代码体现:


package com.juc.deadlock;

 

import java.util.concurrent.TimeUnit;

 

/**

 * @description: 造成死锁的原因,如何排除

 * @author: Mr.Li

 * @create: 2019-10-10 15:53

 **/

class HoldLockThread implements Runnable {

    private String lockA;

    private String lockB;

 

    public HoldLockThread(String lockA, String lockB) {

        this.lockA = lockA;

        this.lockB = lockB;

    }

 

    @Override

    public void run() {

        synchronized (lockA) {

            System.out.println(Thread.currentThread().getName() + "\t自己持有" + lockA + "尝试获得" + lockB);

            try {

                TimeUnit.SECONDS.sleep(2);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

            synchronized (lockB) {

                System.out.println(Thread.currentThread().getName() + "\t自己持有" + lockB + "尝试获得" + lockA);

            }

        }

    }

}

 

public class DeadLockDemo {

    public static void main(String[] args) {

        String lockA = "lockA";

        String lockB = "lockB";

 

        new Thread(new HoldLockThread(lockA,lockB),"AA").start();

        new Thread(new HoldLockThread(lockB,lockA),"BB").start();

    }

}

 


运行:程序始终未结束,并不知道发生什么错误




如何排查问题及相应解决:首先转到终端,输入命令  jsp -l  查出对应程序的进程号为 4184




 


然后输入命令:jstack 4184




 


可看到结果:发现一处死锁错误


Java stack information for the threads listed above:

===================================================

"BB":

        at com.juc.deadlock.HoldLockThread.run(DeadLockDemo.java:30)

        - waiting to lock <0x00000000d604bf88> (a java.lang.String)

        - locked <0x00000000d604bfc0> (a java.lang.String)

        at java.lang.Thread.run(Thread.java:748)

"AA":

        at com.juc.deadlock.HoldLockThread.run(DeadLockDemo.java:30)

        - waiting to lock <0x00000000d604bfc0> (a java.lang.String)

        - locked <0x00000000d604bf88> (a java.lang.String)

        at java.lang.Thread.run(Thread.java:748)

 

Found 1 deadlock.

 

 


好了,也到尾声了,感谢你的观看,希望你尽快学会掌握喔!



我的网名不再改
4楼 · 2021-05-14 15:23

 前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率、压榨CPU处理能力、多条线路同时运行等都是强有力的杀手锏工具。线程是如此的重要,那么我们来思考这样一个问题。假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行,JVM不断的回收,这样重复这个过程,并且任务过多,一下子new不过来,任务就会有遗漏,可能发生这样的情况,任务执行了但是却没有返回结果,并且很容易发生宕机。这是我们绝对不愿意看到的场景。所以,我们为什么不思考一下,然后用一个管理池把所有的线程管理起来呢?那么线程池就应运而生了。本期的博客就来聚焦线程池的源码,走进线程池的内部,看看它究竟是如何工作的,使用线程池到底能带给我们什么好处?我们为什么要使用它?

本篇博客的目录:

一:线程池的介绍

二:使用线程池的好处

三:线程池的分类

四:ThreadPooolExector类的实现源码分析

五:关于线程池的总结

一:线程池的介绍

  1: 前言中我们已经说明了单线程创建处理任务的时候的场景的弊端,线程池的出现,就是专门为了解决这一问题应运而生的,它解决了由于线程创建太过频繁而发生的性能损耗,采用了线程处理完任务而继续保持留在线程池内的机制,而勉去重新创建新线程的这一特别耗费时间和资源的开销。线程池的定义如下:线程池是指管理一组工作线程的资源池,线程池是与工作队列密切相关的,其中在工作队列中宏保存了所有等待执行的任务,从工作队列中获取一个任务,执行任务,然后在线程池中并等待一下个任务。

2:线程池的特点:线程池是专门用来管理线程的,首先呢,它本身创建出来的时候,里面是没有线程的。只有当有任务进来的时候,它才会去创建线程去处理任务线程,这时候线程池的大小就处于一个增长的过程。在这里注意,有两个线程的概念,一个是工作线程,一个是任务线程。当然我们建立一个池,不能让它无限制的增长,因此它具有以下几个属性:coreSize、maxiumSize、BlockingQueue,其中coreSize我们暂且先翻译为核心池大小,它是一个int的数值类型,表示的是创建线程的个数,这是一个临界值,当线程池中的数量超过coreSize个大小的时候,它就会将任务放在阻塞队列中,等到线程中的数目降到coreSize大小之下,然后它会从队列中取,继续执行任务。当然也会有任务数量比较多的情况,这时候就会创建maxiumSize个线程数了,注意:maxiumSize是它的最大值,超过这个数,线程池就要对其进行采取拒绝策略了。

我们用图来表示一下这个过程:

二:使用线程池的好处

我总结了一下,主要有以下几点:

1:线程统一管理,线程池具有创建线程和销毁线程的能力,线程集中在一起比起分散开来,更加便于管理

2:重用现有的线程而不是创建新线程,可以在处理多个请求的时候分摊线程创建和销毁过程中产生的巨大开销

3:当请求到达的时候,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性

4:可以创建足够多的线程便于处理器保持忙碌状态,而对这些足够多多线程数量又进行了限制,会防止其溢出,耗尽处理器内存

5:可以防止多线程相互争夺资源而使应用程序而产生的并发问题

三:线程池的分类

    从jdk1.5开始中,api就提供了灵活的线程池以及一些配置,我们可以调用Executors中的静态工厂方法来创建一个线程池,Executors是一个接口,它的子类是ExecutorService,然后有一个具体的实现类:ThreadPoolExector.关于线程池,它有以下几种不同的分类:

1:newFixedThreadPool

     Fixed这个词的意思是固定不变的,newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的数量,这时线程池的大小将不会变化。如果线程在执行的过程中发生异常,这个时候线程池就会补充一个线程

2: newCachedThreadPool

  newCachedThreadPool将会创建一个可缓存的线程池,如果线程池的当前数量超过了线程池,那么线程池将回收空闲的线程,如果任务很多的时候,它将会继续创建线程而不会存在任何的限制。

   3: newSingleThreadExector

newSingleThreadExector将只会创建一个线程来执行任务,这点有点类似于node.js,采用单线程的机制,如果这个线程异常结束,那么会再重新创建一个线程,它始终保证在线程池里的只有一个线程,这样就可以保证任务在队列总是按照预定的顺序执行。

    4: newScheduledThreadPool

newScheduledThreadPool创建一个固定长度的线程池,而且是以延迟或者定时的方式来执行任务,这点有点类似于Timer类

 

四:ThreadPooolExector类的实现源码分析

1:为什么要讲ThreadPoolExector类?

Exector是ThreadPoolExector的祖父类接口,ThreadPoolExector的直接父类接口是ExectorService,而我们所讲的第三点,其中的不同线程池的分类其实都是Exector中的方法,而在ThreadPoollExector中得到了实现,所以我们要构建的不同种类的线程池主要还是依赖这个类完成,接下来我们就聚焦ThreadPoolExector来看其具体的实现方法

2:ThreadPoolExector的源码讲解

2.1:同样的,它是一个类,首先我们来看它有哪些全局变量

   private final BlockingQueue workQueue;//存放线程的队列。这里是一个阻塞队列

    
    private final ReentrantLock mainLock = new ReentrantLock();//维持一个可重入锁

    
    private final Condition termination = mainLock.newCondition();//获取锁的状态

    
    private final HashSet workers = new HashSet();//存放Worker类的集合

    
    private volatile long  keepAliveTime;//保持存活的时间

    
    private volatile boolean allowCoreThreadTimeOut;//是否允许核心线程超时

    
    private volatile int   corePoolSize;//核心池大小

    
    private volatile int   maximumPoolSize;//最大线程池的大小
    
 
    private volatile int   poolSize;//池的大小

    
    private volatile RejectedExecutionHandler handler;//注入执行的处理器

    
    private volatile ThreadFactory threadFactory;//线程工厂,主要是用来生产线程

    
    private int largestPoolSize;//最大线程大小

    
    private long completedTaskCount;//已执行完成的任务数量

    
    private static final RejectedExecutionHandler defaultHandler =        new AbortPolicy();                      //拒绝策略类

 

从中,我们也可以看出我们在线程池介绍中谈到的关于coreSize和maxiumSize等参数,这些int值对线程池的中的线程池数量进行了限制,还有一些关于锁ReentrantLock
 

的类,这是一个可重入锁,它的主要目的是锁住其操作,因为线程的操作要保证其原子性,防止冲突发生,所以在其源码中很多都对其进行了上锁操作。还有一个很重要的值的全局的变量state:

volatile int runState;//运行状态
    static final int RUNNING    = 0;//0表示正在运行中
    static final int SHUTDOWN   = 1;//1表示关闭
    static final int STOP       = 2;//2表示停止
    static final int TERMINATED = 3;//3表示结束

这些状态值是线程池目前所处环境的状态的体现,它采用int数字来表现,记住这些值很重要,因为后面有很多方法调用线程池的运行状态,有很多对其值进行判断。采用volatile修饰,也保证其在线程池中是随时保证对其他线程的可见,防止并发过程中未知异常的发生。

 

2.2:ThreadPoolExector的构造函数

     public ThreadPoolExecutor(int corePoolSize,                  //核心池大小
                              int maximumPoolSize,               //池的最大大小
                              long keepAliveTime,                //保持活动的时间
                              TimeUnit unit,        //时间单位
                              BlockingQueue workQueue) {         //阻塞队列
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }    
    public ThreadPoolExecutor(int corePoolSize,//核心池大小
                              int maximumPoolSize,//池的最大大小
                              long keepAliveTime,//保持存活的时间
                              TimeUnit unit,//时间单位
                              BlockingQueue workQueue,//阻塞队列
                              ThreadFactory threadFactory) {//线程工厂
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,//调用其他构造函数             threadFactory, defaultHandler);
    }    
    public ThreadPoolExecutor(int corePoolSize,//核心池的大小
                              int maximumPoolSize,//最大池的大小
                              long keepAliveTime,//保持唤醒的时间
                              TimeUnit unit,//时间单元
                              BlockingQueue workQueue,//阻塞队列
                              RejectedExecutionHandler handler) {//注入的执行处理器
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,//调用其他函数             Executors.defaultThreadFactory(), handler);
    }    
    public ThreadPoolExecutor(int corePoolSize,//核心池的大小
                              int maximumPoolSize,//最大池大小
                              long keepAliveTime,//保持存活的时间
                              TimeUnit unit,//时间单位
                              BlockingQueue workQueue,//阻塞队列
                              ThreadFactory threadFactory,//线程工厂
                              RejectedExecutionHandler handler) {//注入的执行处理器
        if (corePoolSize < 0 ||                     //如果核心池的大小小于0
            maximumPoolSize <= 0 ||                //最大池大小小于0
            maximumPoolSize < corePoolSize ||       //最大池大小不小于核心池大小
            keepAliveTime < 0)  //保持唤醒的时间小于0
            throw new IllegalArgumentException();//抛出主题违法异常
        if (workQueue == null || threadFactory == null || handler == null)//如果队列为null或者线程工厂为null或者处理器为null
            throw new NullPointerException();//抛出空指针异常
        this.corePoolSize = corePoolSize;//构造核心池大小
        this.maximumPoolSize = maximumPoolSize;//最大池的大小
        this.workQueue = workQueue;//工作队列
        this.keepAliveTime = unit.toNanos(keepAliveTime);//以保持的存活时间构造存活时间
        this.threadFactory = threadFactory;//构造线程工厂
        this.handler = handler;//处理器
    }

可以看出ThreadPoolExector一共有四个构造函数,但是最后调用的都是最后一个,我们可以只看最后一个,它主要有核心池大小、最大池大小、存活时间、时间单位、阻塞队列、线程工厂这几个参数,其中又对其进行了值范围的检查,如果参数违法就抛出异常,然后构造进去。关于这几个参数,随着后面我们对其方法的讲解,会理解越来越深刻的。

2.3:ThreadPool的主要方法讲解

2.3.1:我们先来看一下最重要的方法,执行任务的方法:

    public void execute(Runnable command) {//执行方法
        if (command == null)//如果任务线程为null
            throw new NullPointerException();//抛出空指针异常
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//如果线程池的大小大于核心池的大小或者线程池没有保证在coreSize下
            if (runState == RUNNING && workQueue.offer(command)) {//如果运行状态是正在运行并且阻塞队列中把线程任务放进去
                if (runState != RUNNING || poolSize == 0)//如果运行状态不是正在运行或者池的大小是0
                    ensureQueuedTaskHandled(command);//确保队列中的任务被处理            }            else if (!addIfUnderMaximumPoolSize(command))//没有在保证最大的线程池大小下添加线程任务
                reject(command); // 调用注入方法        }
    }

这是执行任务的方法,其中可以看出当任务线程为null的时候,就抛出异常,然后如果池的大小大于核心池的大小,也就是说此时它要将任务放在阻塞队列中,这个时候它采用的是BlockingQueue的offer方法,将任务放进去,同时保证状态是正在运行。因为线程池的状态随时可能被修改,所以处处得判断。添加进去以后,如果状态不是在运行中,或者池里没有线程,就调用ensureQueueTaskHandled处理任务,我们来看看这个方法的源代码:

    private void ensureQueuedTaskHandled(Runnable command) {//确保队列中的任务被处理
        final ReentrantLock mainLock = this.mainLock;//主锁
        mainLock.lock();//上锁
        boolean reject = false;//设置reject为false
        Thread t = null;//声明一个线程t
        try {            int state = runState;//获取运行状态
            if (state != RUNNING && workQueue.remove(command))//如果状态不是在运行中,在队列中移除线程
                reject = true;//注入设置为true
            else if (state < STOP &&//如果状态小于停止(这时候int的值作用就发挥了,看一下小于stop的有几?stop是2,小于它也就是0和1,就表示运行和结束,上面的if说明了不是0,那么这里就表示1,也就是结束) 并且 如果池的大小大于1并且队列不是空,也就是还存在任务在队列中没有执行完
                     poolSize < Math.max(corePoolSize, 1) &&
                     !workQueue.isEmpty())
                t = addThread(null);//添加一个线程,设置参数为null
        } finally {
            mainLock.unlock();//最后解锁        }        if (reject)//如果注入为ture
            reject(command);//注入线程
        else if (t != null)//如果线程不为null
            t.start();//线程开始运行
    }

其中可以看到如果线程池的状态是关闭的时候,这时候队列也会移除任务,然后调用addThread方法,并且参数为null:

    
    private Thread addThread(Runnable firstTask) {//添加线程
        Worker w = new Worker(firstTask);//工作类新构造一个线程
        Thread t = threadFactory.newThread(w);//用线程工厂生产一个新类
        if (t != null) {//如果生产出来的类不是null
            w.thread = t;//设置一个线程t
            workers.add(w);//工作线程添加
            int nt = ++poolSize;//池的大小增加
            if (nt > largestPoolSize)//如果线程池的大小大于最大池的大小
                largestPoolSize = nt;//把增加后的池的大小赋给最大池大小        }        return t;//返回该线程
    }

我们来到addThread方法,此时参数为null,Worker类此时会构建一个null,我们再来看Worker的构造方法,然后看threadFactory,也就是线程工厂的方法

Worker(Runnable firstTask) {//构造一个任务线程
            this.firstTask = firstTask;
        }

 

从中可以看出,ThreadFactory只是进行了一个对创建线程的封装,如果传入的参数为null,那么它也会返回null,那么addThread就会返回null,我们再回去看看,也就是说如果其线程池状态为停止,那么阻塞队列会移除其中的任务,并不再执行任务线程。好了,我们再回过头,看看按照正常逻辑怎么走。看一下addIfUnderCorePoolSize的源码:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {//保证在核心池的大小下添加任务
        Thread t = null;//线程为null
        final ReentrantLock mainLock = this.mainLock;//主锁
        mainLock.lock();//开始上锁
        try {            if (poolSize < corePoolSize && runState == RUNNING)//如果池的大小小于核心池大小并且是正在运行状态
                t = addThread(firstTask);//把第一个任务添加进去
        } finally {
            mainLock.unlock();//最后解锁        }        if (t == null)//如果线程是null的话
            return false;//返回false
        t.start();//线程开始执行
        return true;//返回true
    }

这个方法主要是说如果线程数还未超过coreSize的时候,此时就会通过线程工厂生产出一个线程然后执行任务,这是绝大多数的线程的状态。那么此时就还剩一种状态,也就是在maxiumSize下,这个时候,会调用addIfUnderMaximumPoolSize方法,我们来分析一下这个方法的源码:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {//添加任务在未超过池的最大容量时
        Thread t = null;//声明一个线程
        final ReentrantLock mainLock = this.mainLock;//开启一个主锁
        mainLock.lock();//开始上锁
        try {            if (poolSize < maximumPoolSize && runState == RUNNING)//如果池的大小小于最大池的大小并且运行状态是在运行中
                t = addThread(firstTask);//调用添加线程的方法添加线程
        } finally {
            mainLock.unlock();//最后一定解锁        }        if (t == null)//如果线程为null
            return false;//返回false
        t.start();//线程开始运行
        return true;//返回true
    }

这里也是对其池的大小进行了判断,如果其符合小于最大池大小并且线程池状态是正在运行,那么就生产线程去执行任务。

总结:execute方法主要是对线程数在不同的大小的判断,并且是时刻判断线程池的运行状太,如果是正在运行,那么就会生产线程去执行任务,一旦超过coresize就放在阻塞队列中去,未超过maxiumSize也会继续执行,如果线程池状态是停止的话,那么此时阻塞队列会移除任务,并且不再创建线程去执行任务。

 2.3.2:ThreadPoolExector中的内部类Worker的讲解

Worker作为ThreadPoolExector中的一个内部类,它也继承了Runnable接口,所以其本质上是一个线程,按照我们分析源码的技巧,先来看看它的全局变量:

       private final ReentrantLock runLock = new ReentrantLock();//新建一个可重入锁

        
        private Runnable firstTask;//第一个任务线程

        
        volatile long completedTasks;//已经完成的任务数
        
        Thread thread;//声明一个线程

从中可以看出它的任务,而Woker其实也就是线程池中运行的工作线程,这个类中主要是对线程的打断和对线程池的关闭等一些列的方法操作,这里我们拿出两个方法来讲,一个是shutdown方法,顾名思义也就是关闭线程池,我们来看一下具体的代码:

public void shutdown() {   //关闭方法        
    SecurityManager security = System.getSecurityManager();//获取系统安全管理器
    if (security != null)//如果安全接口不为null
            security.checkPermission(shutdownPerm);//检查权限

        final ReentrantLock mainLock = this.mainLock;//获取可重入锁
        mainLock.lock();//开始上锁
        try {            if (security != null) { //如果安全接口不为null                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }            int state = runState;            if (state < SHUTDOWN)//如果状态小于关闭,那么肯定是running或者介于running和关闭之间
                runState = SHUTDOWN;//状态设置为关闭

            try {                for (Worker w : workers) {//遍历整个工作线程
                    w.interruptIfIdle();//如果是空闲的就打断                }
            } catch (SecurityException se) { // Try to back out
                runState = state;                // tryTerminate() here would be a no-op
                throw se;
            }

            tryTerminate(); // Terminate now if pool and queue empty
        } finally {
            mainLock.unlock();
        }
    }

可以看出结束方法首先调用的是系统安全接口,这主要的目的是为了在系统层面对线程池进行保护,防止其发生意外。比如中断系统进程等,获取了安全管理器之后接下来再对其进行权限检查,然后就是加锁控制。接下来就是取状态进行判断了,接着遍历循环整个线程池里的工作线程,一旦发现空闲的就进行打断终止。然后是调用tryTerminate方法,我们来看看其中的源码:

  
    private void tryTerminate() { //结束
        if (poolSize == 0) {                      
            int state = runState; //取当前运行状态
            if (state < STOP && !workQueue.isEmpty()) {//如果运行状态小于stop,也就是有运行和关闭的可能并且队列不为空
                state = RUNNING; //运行状态设置为Running
                Thread t = addThread(null);//不创造线程
                if (t != null)//                    t.start();
            }            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }

如果线程池里没有线程了,就取当前的运行状态,然后再阻塞队列中还有线程的情况下,可以看到不创建线程了,此时传入addThread的参数为null,我们来具体看一下:

private Thread addThread(Runnable firstTask) {//添加线程
        Worker w = new Worker(firstTask);//工作类新构造一个线程
        Thread t = threadFactory.newThread(w);//用线程工厂生产一个新类
        if (t != null) {//如果生产出来的类不是null
            w.thread = t;//设置一个线程t
            workers.add(w);//工作线程添加
            int nt = ++poolSize;//池的大小增加
            if (nt > largestPoolSize)//如果线程池的大小大于最大池的大小
                largestPoolSize = nt;//把增加后的池的大小赋给最大池大小        }        return t;//返回该线程
    }

如果参数为null,那会就会返回一个null的线程,可以看出这里将不会产生新的线程,队列中的任务有不会进行处理。然后是通过Conditon.signal方法唤醒所有等待线程,接下来再调用terminated()方法。

 protected void terminated() { }

目前的jdk1.6这个方法是空的,也就是不执行任何操作,那么这个方法就结束了。总结来说就是线程关闭的时候,检查队列中是否还有任务,如果有任务,不再继续创建线程,用原来的线程把其执行完后关闭线程池,状态设置为Terminted,如果没有任务,检查线程池是不是空,如果不是空,把其空间线程全部中断,等待工作的线程处理完成以后关闭整个线程池!

 五:总结

       本篇博客主要分析了线程池的实现原理,在线程池中很重要和关键的一部分就是对部分参数的理解,还有它的运行原理,作为设计到线程部分的知识,可以看出线程池的每步操作基本上都有一个ReentrantLock可重入锁,这是一种保护机制,它类似于synchronized,但是要比它更强大,并且是建立在java类的基础上,实现更灵活,比如它可以实现中断等待锁、性能更加优良等特性,还有对其状态的控制和操作,主要是通过Worker这个内部类来实现的,具体的可以参考jdk代码,我这里只是起到一个抛砖引玉的作用,本篇博客暂时写到这里。


相关问题推荐

  • 回答 3

    线程池主要作用就是减少线程创建的开销,当线程很多时可以使用。当然,线程池也不只是这一个作用,具体的建议百度看看,讲得比较全面。

  • 回答 4

    任务队列:用于缓存提交的任务。线程数量管理功能:一个线程池必须能够很好地管理和控制线程的数量。大致会有三个参数,创建线程池时的初始线程数量init;自动扩充时的最大线程数量max;在线程池空闲时需要释放资源但是也要维持一定数量的核心线程数量core。通...

  • 回答 7

    线程池是指在初始化一个多线程应用程序过程中创建一个线程集合,然后在需要执行新的任务时重用这些线程而不是新建一个线程。线程池中线程的数量通常完全取决于可用内存数量和应用程序的需求。然而,增加可用线程数量是可能的。线程池中的每个线程都有被分配一...

没有解决我的问题,去提问