ITKeyword,专注技术干货聚合推荐

注册 | 登录

并发之阻塞队列&线程池

分享于 2016-07-29

推荐:线程池ThreadPoolExecutor与阻塞队列BlockingQueue

 从Java5开始,Java提供了自己的线程池。每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池。以下是我的学习过程。 首先

2019阿里云双12.12最低价产品入口(新老用户均可),
地址https://www.aliyun.com/minisite/goods

    原文链接:

   http://www.cnblogs.com/dolphin0520/p/3932906.html

阻塞队列

  在前面几篇文章中,我们讨论了同步容器(Hashtable、Vector),也讨论了并发容器(ConcurrentHashMap、CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便。今天我们来讨论另外一类容器:阻塞队列。

  在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。

  使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

  本文先讲述一下java.util.concurrent包下提供主要的几种阻塞队列,然后分析了阻塞队列和非阻塞队列的中的各个方法,接着分析了阻塞队列的实现原理,最后给出了一个实际例子和几个使用场景。

  一.几种主要的阻塞队列

  二.阻塞队列中的方法 VS 非阻塞队列中的方法

  三.阻塞队列的实现原理

  四.示例和使用场景

  若有不正之处请多多谅解,并欢迎批评指正。

      原文链接:

   http://www.cnblogs.com/dolphin0520/p/3932906.html

一.几种主要的阻塞队列

  自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

  ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

  LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

  PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

  DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

二.阻塞队列中的方法 VS 非阻塞队列中的方法

1.非阻塞队列中的几个主要方法:

  add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

  remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

  offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

  poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

  peek():获取队首元素,若成功,则返回队首元素;否则返回null

 

  对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

2.阻塞队列中的几个主要方法:

  阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:

  put(E e)

  take()

  offer(E e,long timeout, TimeUnit unit)

  poll(long timeout, TimeUnit unit)

  

  put方法用来向队尾存入元素,如果队列满,则等待;

  take方法用来从队首取元素,如果队列为空,则等待;

  offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

  poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

三.阻塞队列的实现原理

  前面谈到了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

  首先看一下ArrayBlockingQueue类中的几个成员变量:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  class  ArrayBlockingQueue<E>  extends  AbstractQueue<E> implements  BlockingQueue<E>, java.io.Serializable {   private  static  final  long  serialVersionUID = -817911632652898426L;   /** The queued items  */ private  final  E[] items; /** items index for next take, poll or remove */ private  int  takeIndex; /** items index for next put, offer, or add. */ private  int  putIndex; /** Number of items in the queue */ private  int  count;   /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */   /** Main lock guarding all access */ private  final  ReentrantLock lock; /** Condition for waiting takes */ private  final  Condition notEmpty; /** Condition for waiting puts */ private  final  Condition notFull; }

   可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。

  lock是一个可重入锁,notEmpty和notFull是等待条件。

  下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:

1 2 3 4 5 6 7 8 public  ArrayBlockingQueue( int  capacity) { } public  ArrayBlockingQueue( int  capacity,  boolean  fair) {   } public  ArrayBlockingQueue( int  capacity,  boolean  fair,                            Collection<?  extends  E> c) { }

   第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。

  然后看它的两个关键方法的实现:put()和take():

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public  void  put(E e)  throws  InterruptedException {      if  (e ==  null throw  new  NullPointerException();      final  E[] items =  this .items;      final  ReentrantLock lock =  this .lock;      lock.lockInterruptibly();      try  {          try  {              while  (count == items.length)                  notFull.await();          catch  (InterruptedException ie) {              notFull.signal();  // propagate to non-interrupted thread              throw  ie;          }          insert(e);      finally  {          lock.unlock();      } }

   从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。

  当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。

  我们看一下insert方法的实现:

1 2 3 4 5 6 private  void  insert(E x) {      items[putIndex] = x;      putIndex = inc(putIndex);      ++count;      notEmpty.signal(); }

   它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

  下面是take()方法的实现:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  E take()  throws  InterruptedException {      final  ReentrantLock lock =  this .lock;      lock.lockInterruptibly();      try  {          try  {              while  (count ==  0 )                  notEmpty.await();          catch  (InterruptedException ie) {              notEmpty.signal();  // propagate to non-interrupted thread              throw  ie;          }          E x = extract();          return  x;      finally  {          lock.unlock();      } }

   跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:

1 2 3 4 5 6 7 8 9 private  E extract() {      final  E[] items =  this .items;      E x = items[takeIndex];      items[takeIndex] =  null ;      takeIndex = inc(takeIndex);      --count;      notFull.signal();      return  x; }

   跟insert方法也很类似。

  其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。

四.示例和使用场景

  下面先使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public  class  Test {      private  int  queueSize =  10 ;      private  PriorityQueue<Integer> queue =  new  PriorityQueue<Integer>(queueSize);            public  static  void  main(String[] args)  {          Test test =  new  Test();          Producer producer = test. new  Producer();          Consumer consumer = test. new  Consumer();                    producer.start();          consumer.start();      }            class  Consumer  extends  Thread{                    @Override          public  void  run() {              consume();          }                    private  void  consume() {              while ( true ){                  synchronized  (queue) {                      while (queue.size() ==  0 ){                          try  {                              System.out.println( "队列空,等待数据" );                              queue.wait();                          catch  (InterruptedException e) {                              e.printStackTrace();                              queue.notify();                          }                      }                      queue.poll();           //每次移走队首元素                      queue.notify();                      System.out.println( "从队列取走一个元素,队列剩余" +queue.size()+ "个元素" );                  }              }          }      }            class  Producer  extends  Thread{                    @Override          public  void  run() {              produce();          }                    private  void  produce() {              while ( true ){                  synchronized  (queue) {                      while (queue.size() == queueSize){                          try  {                              System.out.println( "队列满,等待有空余空间" );                              queue.wait();                          catch  (InterruptedException e) {                              e.printStackTrace();                              queue.notify();                          }                      }                      queue.offer( 1 );         //每次插入一个元素                      queue.notify();                      System.out.println( "向队列取中插入一个元素,队列剩余空间:" +(queueSize-queue.size()));                  }              }          }      } }

   这个是经典的生产者-消费者模式,通过阻塞队列和Object.wait()和Object.notify()实现,wait()和notify()主要用来实现线程间通信。

  具体的线程间通信方式(wait和notify的使用)在后续问章中会讲述到。

  下面是使用阻塞队列实现的生产者-消费者模式:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public  class  Test {      private  int  queueSize =  10 ;      private  ArrayBlockingQueue<Integer> queue =  new  ArrayBlockingQueue<Integer>(queueSize);            public  static  void  main(String[] args)  {          Test test =  new  Test();          Producer producer = test. new  Producer();          Consumer consumer = test. new  Consumer();                    producer.start();          consumer.start();      }            class  Consumer  extends  Thread{                    @Override          public  void  run() {              consume();          }                    private  void  consume() {              while ( true ){                  try  {                      queue.take();                      System.out.println( "从队列取走一个元素,队列剩余" +queue.size()+ "个元素" );                  catch  (InterruptedException e) {                      e.printStackTrace();                  }              }          }      }            class  Producer  extends  Thread{                    @Override          public  void  run() {              produce();          }                    private  void  produce() {              while ( true ){                  try  {                      queue.put( 1 );                      System.out.println( "向队列取中插入一个元素,队列剩余空间:" +(queueSize-queue.size()));                  catch  (InterruptedException e) {                      e.printStackTrace();                  }              }          }      } }

   有没有发现,使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

  在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

  阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

  参考资料:

  《Java编程实战》

  http://ifeve.com/java-blocking-queue/

  http://endual.iteye.com/blog/1412212

  http://blog.csdn.net/zzp_403184692/article/details/8021615

  http://www.cnblogs.com/juepei/p/3922401.html

------------------------------------

线程池

Java并发编程:线程池的使用

  在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

  如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间

  那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

  在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

  以下是本文的目录大纲:

推荐:【Java】并发之阻塞队列

BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个

  一.Java中的ThreadPoolExecutor类

  二.深入剖析线程池实现原理

  三.使用示例

  四.如何合理配置线程池的大小 

  若有不正之处请多多谅解,并欢迎批评指正。

  请尊重作者劳动成果,转载请标明原文链接:

  http://www.cnblogs.com/dolphin0520/p/3932921.html

 

一.Java中的ThreadPoolExecutor类

  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

  在ThreadPoolExecutor类中提供了四个构造方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  ThreadPoolExecutor  extends  AbstractExecutorService {      .....      public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,              BlockingQueue<Runnable> workQueue);        public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,              BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);        public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,              BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);        public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,          BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);      ... }

   从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

   下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS; 
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
  • ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值
  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
  •    具体参数的配置与线程池的关系将在下一节讲述。

  从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  abstract  class  AbstractExecutorService  implements  ExecutorService {      protected  <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };      protected  <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };      public  Future<?> submit(Runnable task) {};      public  <T> Future<T> submit(Runnable task, T result) { };      public  <T> Future<T> submit(Callable<T> task) { };      private  <T> T doInvokeAny(Collection<?  extends  Callable<T>> tasks,                              boolean  timed,  long  nanos)          throws  InterruptedException, ExecutionException, TimeoutException {      };      public  <T> T invokeAny(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException, ExecutionException {      };      public  <T> T invokeAny(Collection<?  extends  Callable<T>> tasks,                             long  timeout, TimeUnit unit)          throws  InterruptedException, ExecutionException, TimeoutException {      };      public  <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException {      };      public  <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks,                                           long  timeout, TimeUnit unit)          throws  InterruptedException {      }; }

   AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

  我们接着看ExecutorService接口的实现:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public  interface  ExecutorService  extends  Executor {        void  shutdown();      boolean  isShutdown();      boolean  isTerminated();      boolean  awaitTermination( long  timeout, TimeUnit unit)          throws  InterruptedException;      <T> Future<T> submit(Callable<T> task);      <T> Future<T> submit(Runnable task, T result);      Future<?> submit(Runnable task);      <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException;      <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks,                                    long  timeout, TimeUnit unit)          throws  InterruptedException;        <T> T invokeAny(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException, ExecutionException;      <T> T invokeAny(Collection<?  extends  Callable<T>> tasks,                      long  timeout, TimeUnit unit)          throws  InterruptedException, ExecutionException, TimeoutException; }

   而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

1 2 3 public  interface  Executor {      void  execute(Runnable command); }

   到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

  在ThreadPoolExecutor类中有几个非常重要的方法:

1 2 3 4 execute() submit() shutdown() shutdownNow()

   execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

  还有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

二.深入剖析线程池实现原理

  在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:

  1.线程池状态

  2.任务的执行

  3.线程池中的线程初始化

  4.任务缓存队列及排队策略

  5.任务拒绝策略

  6.线程池的关闭

  7.线程池容量的动态调整

 

1.线程池状态

  在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

1 2 3 4 5 volatile  int  runState; static  final  int  RUNNING    =  0 ; static  final  int  SHUTDOWN   =  1 ; static  final  int  STOP       =  2 ; static  final  int  TERMINATED =  3 ;

   runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

  下面的几个static final变量表示runState可能的几个取值。

  当创建线程池后,初始时,线程池处于RUNNING状态;

  如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

  如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

  当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

  在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private  final  BlockingQueue<Runnable> workQueue;               //任务缓存队列,用来存放等待执行的任务 private  final  ReentrantLock mainLock =  new  ReentrantLock();    //线程池的主要状态锁,对线程池状态(比如线程池大小                                                                //、runState等)的改变都要使用这个锁 private  final  HashSet<Worker> workers =  new  HashSet<Worker>();   //用来存放工作集   private  volatile  long   keepAliveTime;     //线程存货时间    private  volatile  boolean  allowCoreThreadTimeOut;    //是否允许为核心线程设置存活时间 private  volatile  int    corePoolSize;      //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private  volatile  int    maximumPoolSize;    //线程池最大能容忍的线程数   private  volatile  int    poolSize;        //线程池中当前的线程数   private  volatile  RejectedExecutionHandler handler;  //任务拒绝策略   private  volatile  ThreadFactory threadFactory;    //线程工厂,用来创建线程   private  int  largestPoolSize;    //用来记录线程池中曾经出现过的最大线程数   private  long  completedTaskCount;    //用来记录已经执行完毕的任务个数

   每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

  corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

  假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

  因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

  当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

  如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

  然后就将任务也分配给这4个临时工人做;

  如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

  当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

  不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

  largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

  下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

  在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

1 2 3 4 5 6 7 8 9 10 11 12 public  void  execute(Runnable command) {      if  (command ==  null )          throw  new  NullPointerException();      if  (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {          if  (runState == RUNNING && workQueue.offer(command)) {              if  (runState != RUNNING || poolSize ==  0 )                  ensureQueuedTaskHandled(command);          }          else  if  (!addIfUnderMaximumPoolSize(command))              reject(command);  // is shutdown or saturated      } }

   上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:

  首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;

  接着是这句,这句要好好理解一下:

1 if  (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

  如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行

1 addIfUnderCorePoolSize(command)

  如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。

  如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:

1 if  (runState == RUNNING && workQueue.offer(command))

   如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

1 addIfUnderMaximumPoolSize(command)

  如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

  回到前面:

1 if  (runState == RUNNING && workQueue.offer(command))

   这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

1 if  (runState != RUNNING || poolSize ==  0 )

   这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:

1 ensureQueuedTaskHandled(command)

   进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。

  我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

1 2 3 4

推荐:JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue

池技术是典型的享元模式。 频繁使用new Thread来创建线程的方式并不太好。因为每次new Thread新建和销毁对象性能较差,线程缺乏统一管理。好在java提供了线程池

    原文链接:    http://www.cnblogs.com/dolphin0520/p/3932906.html 阻塞队列   在前面几篇文章中,我们讨论了同步容器(Hashtable、Vector),也讨论了并发容器(ConcurrentHashMap、Co

相关阅读排行


用户评论

游客

相关内容推荐

最新文章

×

×

请激活账号

为了能正常使用评论、编辑功能及以后陆续为用户提供的其他产品,请激活账号。

您的注册邮箱: 修改

重新发送激活邮件 进入我的邮箱

如果您没有收到激活邮件,请注意检查垃圾箱。