线程池

Posted by BY 大雁小鱼 on June 4, 2018

线程池知识点概要

概要

  • 线程池的主要作用是避免程序创建过多的线程来减少系统资源的消耗和竞争,有时也可以通过使用线程池避免不必要的线程创建开销,在这一点上和连接池很相似。
  • 线程池由2部分构成,任务队列和工作线程,工作线程负责具体的线程处理任务,当任务数量大于实际的工作线程数量时,线程池会将多余的任务放入任务队列中。

创建线程的方法

创建线程共有2种方法
1、继承Thread类
2、实现Runnable接口

如何确保线程安全

确保线程安全基本上可以通过3种方法实现
1、尽可能将对象设计成不可变对象
2、使用CAS包装类
3、使用锁

线程池的基本工作原理

线程池由2部分构成,任务队列和工作线程,当使用者向线程池提交一个线程任务的时候,线程池会根据当前是否还可以再创建线程做出2种举动
1、如果当前不可再创建线程,则将该任务放入工作队列中
2、如果当前可以再创建线程,则创建线程并交由该线程执行任务

当一个工作线程执行完任务后,会去检查工作队列中是否还有任务没有执行
1、如果工作队列中还有任务,则取出任务并执行
2、如果工作队列中没有任务,则会根据是否是核心线程做出不同的动作

1、如果是核心线程,则会阻塞
2、如果是非核心线程,则会阻塞一定的时间,如果还取不到任务,就被销毁了,如果取到任务了,则继续执行,这个阻塞时间可以在构造函数中指定

核心线程也不一定是永远阻塞的,通过构造函数中的一个参数allowCoreThreadTimeOut,如果为true,则与非核心线程一样的处理,如果为false则永远阻塞

异常情况

如果线程池中的一个线程抛出异常,会怎么样呢?
这里的情况比较多,我们分情况讨论
情况1、如果这个异常是被线程自己处理掉了,则没什么其他事
情况2、如果抛出了一个没有处理的异常,那么

  • 这个线程会立刻停止运行并被销毁,且线程池会执行 afterExection函数,通知线程池自己有线程执行完毕
  • 该线程被销毁后,如果线程池中核心线程数不够的话,线程池会添加一个线程进来(通过addWorker),但如果够的,则不添加

队列的有界性

工作队列可以是有界队列ArrayBlockingQueue,也可以是无界队列LinkedBlockingQueue,如果你提交任务时,队列满了,会发生什么事情?
注意,只有有界队列会满,无界队列是不会满的,如果有界队列满了,线程池会调用rejectedExecutionHandler方法告知调用方任务被拒绝,如果你在构造函数中没有传这个参数,默认行为是抛出RejectedExectionException异常,这个异常需要调用者自己处理,比如重试等等。

线程池的构造参数

总共7个参数

  • corePoolSize 池子中的核心线程数
  • maximumPoolSize 允许的最大线程数
  • keepAliveTime 一般是非核心线程的最大空闲时的可等待时间
  • unit 时间单位
  • workQueue 工作队列
  • threadFactory 线程工厂
  • handler 拒绝时的执行器

核心代码

  • 取任务代码如下
      private Runnable getTask() {
          boolean timedOut = false; // Did the last poll() time out?
    
          retry:
          for (;;) {
              int c = ctl.get();
              int rs = runStateOf(c);
    
              // Check if queue empty only if necessary.
              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                  decrementWorkerCount();
                  return null;
              }
    
              boolean timed;      // Are workers subject to culling?
    
              for (;;) {
              	//工作线程的数量
                  int wc = workerCountOf(c);
                    
                  //allowCoreThreadTimeOut在为false的时候,核心线程数会永远阻塞
                    
                  //timed如果为false的话,就没有时间,永远阻塞
                  timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                  //没有超过最大线程数量 && (未超时 或者 timed为false),就跳出
                  if (wc <= maximumPoolSize && ! (timedOut && timed))
                      break;
                  if (compareAndDecrementWorkerCount(c))
                      return null;//非核心线程,可能会走到这里退出的
                  c = ctl.get();  // Re-read ctl
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
              }
    
              try {
                  Runnable r = timed ?
                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://阻塞一定的时间(针对非核心线程)
                      workQueue.take();//永远阻塞
                  if (r != null)
                      return r;
                  timedOut = true;
              } catch (InterruptedException retry) {
                  timedOut = false;//这个赋值的目的是为了让线程有再次到达try的机会
              }
          }
      }
    
  • 构造函数
      public ThreadPoolExecutor(int corePoolSize, /**保存在池子中的线程数,即便他们是空闲的,除非allowCoreThreadTimeOut被设置了*/
                                int maximumPoolSize, /**允许的最大线程数量*/
                                long keepAliveTime, /**空闲的线程最大可等待的时间*/
                                TimeUnit unit,  /**时间的单位*/
                                BlockingQueue<Runnable> workQueue, /**一个工作队列,存放准备执行的任务*/
                                ThreadFactory threadFactory, /**创建线程的工厂*/
                                RejectedExecutionHandler handler) {
          if (corePoolSize < 0 ||
              maximumPoolSize <= 0 ||
              maximumPoolSize < corePoolSize ||
              keepAliveTime < 0){
              throw new IllegalArgumentException();
          }
          if (workQueue == null || threadFactory == null || handler == null){
              throw new NullPointerException();
          }
          this.corePoolSize = corePoolSize;
          this.maximumPoolSize = maximumPoolSize;
          this.workQueue = workQueue;
          this.keepAliveTime = unit.toNanos(keepAliveTime);
          this.threadFactory = threadFactory;
          this.handler = handler;
      }