Home » geronimo-2.2-source-release » org.apache.geronimo.concurrent.harmony » [javadoc | source]

    1   /*
    2    * Written by Doug Lea with assistance from members of JCP JSR-166
    3    * Expert Group and released to the public domain, as explained at
    4    * http://creativecommons.org/licenses/publicdomain
    5    */
    6   
    7   package org.apache.geronimo.concurrent.harmony;
    8   
    9   import java.util.Arrays;
   10   import java.util.ConcurrentModificationException;
   11   import java.util.HashSet;
   12   import java.util.Iterator;
   13   import java.util.List;
   14   import java.util.concurrent.AbstractExecutorService;
   15   import java.util.concurrent.BlockingQueue;
   16   import java.util.concurrent.Executors;
   17   import java.util.concurrent.Future;
   18   import java.util.concurrent.RejectedExecutionException;
   19   import java.util.concurrent.ThreadFactory;
   20   import java.util.concurrent.TimeUnit;
   21   import java.util.concurrent.locks.Condition;
   22   import java.util.concurrent.locks.ReentrantLock;
   23   
   24   
   25   /**
   26    * An {@link ExecutorService} that executes each submitted task using
   27    * one of possibly several pooled threads, normally configured
   28    * using {@link Executors} factory methods.
   29    *
   30    * <p>Thread pools address two different problems: they usually
   31    * provide improved performance when executing large numbers of
   32    * asynchronous tasks, due to reduced per-task invocation overhead,
   33    * and they provide a means of bounding and managing the resources,
   34    * including threads, consumed when executing a collection of tasks.
   35    * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
   36    * statistics, such as the number of completed tasks.
   37    *
   38    * <p>To be useful across a wide range of contexts, this class
   39    * provides many adjustable parameters and extensibility
   40    * hooks. However, programmers are urged to use the more convenient
   41    * {@link Executors} factory methods {@link
   42    * Executors#newCachedThreadPool} (unbounded thread pool, with
   43    * automatic thread reclamation), {@link Executors#newFixedThreadPool}
   44    * (fixed size thread pool) and {@link
   45    * Executors#newSingleThreadExecutor} (single background thread), that
   46    * preconfigure settings for the most common usage
   47    * scenarios. Otherwise, use the following guide when manually
   48    * configuring and tuning this class:
   49    *
   50    * <dl>
   51    *
   52    * <dt>Core and maximum pool sizes</dt>
   53    *
   54    * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
   55    * pool size 
   56    * (see {@link ThreadPoolExecutor#getPoolSize})
   57    * according to the bounds set by corePoolSize 
   58    * (see {@link ThreadPoolExecutor#getCorePoolSize})
   59    * and
   60    * maximumPoolSize
   61    * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
   62    * When a new task is submitted in method {@link
   63    * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
   64    * are running, a new thread is created to handle the request, even if
   65    * other worker threads are idle.  If there are more than
   66    * corePoolSize but less than maximumPoolSize threads running, a new
   67    * thread will be created only if the queue is full.  By setting
   68    * corePoolSize and maximumPoolSize the same, you create a fixed-size
   69    * thread pool. By setting maximumPoolSize to an essentially unbounded
   70    * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
   71    * accommodate an arbitrary number of concurrent tasks. Most typically,
   72    * core and maximum pool sizes are set only upon construction, but they
   73    * may also be changed dynamically using {@link
   74    * ThreadPoolExecutor#setCorePoolSize} and {@link
   75    * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
   76    *
   77    * <dt> On-demand construction
   78    *
   79    * <dd> By default, even core threads are initially created and
   80    * started only when needed by new tasks, but this can be overridden
   81    * dynamically using method {@link
   82    * ThreadPoolExecutor#prestartCoreThread} or
   83    * {@link ThreadPoolExecutor#prestartAllCoreThreads}.  </dd>
   84    *
   85    * <dt>Creating new threads</dt>
   86    *
   87    * <dd>New threads are created using a {@link
   88    * java.util.concurrent.ThreadFactory}.  If not otherwise specified, a
   89    * {@link Executors#defaultThreadFactory} is used, that creates threads to all
   90    * be in the same {@link ThreadGroup} and with the same
   91    * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
   92    * a different ThreadFactory, you can alter the thread's name, thread
   93    * group, priority, daemon status, etc.  </dd>
   94    *
   95    * <dt>Keep-alive times</dt>
   96    *
   97    * <dd>If the pool currently has more than corePoolSize threads,
   98    * excess threads will be terminated if they have been idle for more
   99    * than the keepAliveTime (see {@link
  100    * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
  101    * reducing resource consumption when the pool is not being actively
  102    * used. If the pool becomes more active later, new threads will be
  103    * constructed. This parameter can also be changed dynamically
  104    * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
  105    * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
  106    * effectively disables idle threads from ever terminating prior
  107    * to shut down.
  108    * </dd>
  109    *
  110    * <dt>Queuing</dt>
  111    *
  112    * <dd>Any {@link BlockingQueue} may be used to transfer and hold
  113    * submitted tasks.  The use of this queue interacts with pool sizing:
  114    *
  115    * <ul>
  116    *
  117    * <li> If fewer than corePoolSize threads are running, the Executor
  118    * always prefers adding a new thread
  119    * rather than queuing.</li>
  120    *
  121    * <li> If corePoolSize or more threads are running, the Executor
  122    * always prefers queuing a request rather than adding a new
  123    * thread.</li>
  124    * 
  125    * <li> If a request cannot be queued, a new thread is created unless
  126    * this would exceed maximumPoolSize, in which case, the task will be
  127    * rejected.</li>
  128    *
  129    * </ul>
  130    *
  131    * There are three general strategies for queuing:
  132    * <ol>
  133    *
  134    * <li> <em> Direct handoffs.</em> A good default choice for a work
  135    * queue is a {@link SynchronousQueue} that hands off tasks to threads
  136    * without otherwise holding them. Here, an attempt to queue a task
  137    * will fail if no threads are immediately available to run it, so a
  138    * new thread will be constructed. This policy avoids lockups when
  139    * handling sets of requests that might have internal dependencies.
  140    * Direct handoffs generally require unbounded maximumPoolSizes to
  141    * avoid rejection of new submitted tasks. This in turn admits the
  142    * possibility of unbounded thread growth when commands continue to
  143    * arrive on average faster than they can be processed.  </li>
  144    *
  145    * <li><em> Unbounded queues.</em> Using an unbounded queue (for
  146    * example a {@link LinkedBlockingQueue} without a predefined
  147    * capacity) will cause new tasks to be queued in cases where all
  148    * corePoolSize threads are busy. Thus, no more than corePoolSize
  149    * threads will ever be created. (And the value of the maximumPoolSize
  150    * therefore doesn't have any effect.)  This may be appropriate when
  151    * each task is completely independent of others, so tasks cannot
  152    * affect each others execution; for example, in a web page server.
  153    * While this style of queuing can be useful in smoothing out
  154    * transient bursts of requests, it admits the possibility of
  155    * unbounded work queue growth when commands continue to arrive on
  156    * average faster than they can be processed.  </li>
  157    *
  158    * <li><em>Bounded queues.</em> A bounded queue (for example, an
  159    * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
  160    * used with finite maximumPoolSizes, but can be more difficult to
  161    * tune and control.  Queue sizes and maximum pool sizes may be traded
  162    * off for each other: Using large queues and small pools minimizes
  163    * CPU usage, OS resources, and context-switching overhead, but can
  164    * lead to artificially low throughput.  If tasks frequently block (for
  165    * example if they are I/O bound), a system may be able to schedule
  166    * time for more threads than you otherwise allow. Use of small queues
  167    * generally requires larger pool sizes, which keeps CPUs busier but
  168    * may encounter unacceptable scheduling overhead, which also
  169    * decreases throughput.  </li>
  170    *
  171    * </ol>
  172    *
  173    * </dd>
  174    *
  175    * <dt>Rejected tasks</dt>
  176    *
  177    * <dd> New tasks submitted in method {@link
  178    * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
  179    * Executor has been shut down, and also when the Executor uses finite
  180    * bounds for both maximum threads and work queue capacity, and is
  181    * saturated.  In either case, the <tt>execute</tt> method invokes the
  182    * {@link RejectedExecutionHandler#rejectedExecution} method of its
  183    * {@link RejectedExecutionHandler}.  Four predefined handler policies
  184    * are provided:
  185    *
  186    * <ol>
  187    *
  188    * <li> In the
  189    * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
  190    * runtime {@link RejectedExecutionException} upon rejection. </li>
  191    * 
  192    * <li> In {@link
  193    * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
  194    * <tt>execute</tt> itself runs the task. This provides a simple
  195    * feedback control mechanism that will slow down the rate that new
  196    * tasks are submitted. </li>
  197    *
  198    * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
  199    * a task that cannot be executed is simply dropped.  </li>
  200    *
  201    * <li>In {@link
  202    * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
  203    * shut down, the task at the head of the work queue is dropped, and
  204    * then execution is retried (which can fail again, causing this to be
  205    * repeated.) </li>
  206    *
  207    * </ol>
  208    *
  209    * It is possible to define and use other kinds of {@link
  210    * RejectedExecutionHandler} classes. Doing so requires some care
  211    * especially when policies are designed to work only under particular
  212    * capacity or queuing policies. </dd>
  213    *
  214    * <dt>Hook methods</dt>
  215    *
  216    * <dd>This class provides <tt>protected</tt> overridable {@link
  217    * ThreadPoolExecutor#beforeExecute} and {@link
  218    * ThreadPoolExecutor#afterExecute} methods that are called before and
  219    * after execution of each task.  These can be used to manipulate the
  220    * execution environment, for example, reinitializing ThreadLocals,
  221    * gathering statistics, or adding log entries. Additionally, method
  222    * {@link ThreadPoolExecutor#terminated} can be overridden to perform
  223    * any special processing that needs to be done once the Executor has
  224    * fully terminated.</dd>
  225    *
  226    * <dt>Queue maintenance</dt>
  227    *
  228    * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
  229    * the work queue for purposes of monitoring and debugging.  Use of
  230    * this method for any other purpose is strongly discouraged.  Two
  231    * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
  232    * ThreadPoolExecutor#purge} are available to assist in storage
  233    * reclamation when large numbers of queued tasks become
  234    * cancelled.</dd> </dl>
  235    *
  236    * <p> <b>Extension example</b>. Most extensions of this class
  237    * override one or more of the protected hook methods. For example,
  238    * here is a subclass that adds a simple pause/resume feature:
  239    *
  240    * <pre>
  241    * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
  242    *   private boolean isPaused;
  243    *   private ReentrantLock pauseLock = new ReentrantLock();
  244    *   private Condition unpaused = pauseLock.newCondition();
  245    *
  246    *   public PausableThreadPoolExecutor(...) { super(...); }
  247    * 
  248    *   protected void beforeExecute(Thread t, Runnable r) {
  249    *     super.beforeExecute(t, r);
  250    *     pauseLock.lock();
  251    *     try {
  252    *       while (isPaused) unpaused.await();
  253    *     } catch(InterruptedException ie) {
  254    *       t.interrupt();
  255    *     } finally {
  256    *       pauseLock.unlock();
  257    *     }
  258    *   }
  259    * 
  260    *   public void pause() {
  261    *     pauseLock.lock();
  262    *     try {
  263    *       isPaused = true;
  264    *     } finally {
  265    *       pauseLock.unlock();
  266    *     }
  267    *   }
  268    * 
  269    *   public void resume() {
  270    *     pauseLock.lock();
  271    *     try {
  272    *       isPaused = false;
  273    *       unpaused.signalAll();
  274    *     } finally {
  275    *       pauseLock.unlock();
  276    *     }
  277    *   }
  278    * }
  279    * </pre>
  280    * @since 1.5
  281    * @author Doug Lea
  282    */
  283   public class ThreadPoolExecutor extends AbstractExecutorService {
  284       /**
  285        * Only used to force toArray() to produce a Runnable[].
  286        */
  287       private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
  288   
  289       /**
  290        * Permission for checking shutdown
  291        */
  292       private static final RuntimePermission shutdownPerm =
  293           new RuntimePermission("modifyThread");
  294   
  295       /**
  296        * Queue used for holding tasks and handing off to worker threads.
  297        */
  298       private final BlockingQueue<Runnable> workQueue;
  299   
  300       /**
  301        * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
  302        * workers set.
  303        */
  304       private final ReentrantLock mainLock = new ReentrantLock();
  305   
  306       /**
  307        * Wait condition to support awaitTermination
  308        */
  309       private final Condition termination = mainLock.newCondition();
  310   
  311       /**
  312        * Set containing all worker threads in pool.
  313        */
  314       private final HashSet<Worker> workers = new HashSet<Worker>();
  315   
  316       /**
  317        * Timeout in nanoseconds for idle threads waiting for work.
  318        * Threads use this timeout only when there are more than
  319        * corePoolSize present. Otherwise they wait forever for new work.
  320        */
  321       private volatile long  keepAliveTime;
  322   
  323       /**
  324        * Core pool size, updated only while holding mainLock,
  325        * but volatile to allow concurrent readability even
  326        * during updates.
  327        */
  328       private volatile int   corePoolSize;
  329   
  330       /**
  331        * Maximum pool size, updated only while holding mainLock
  332        * but volatile to allow concurrent readability even
  333        * during updates.
  334        */
  335       private volatile int   maximumPoolSize;
  336   
  337       /**
  338        * Current pool size, updated only while holding mainLock
  339        * but volatile to allow concurrent readability even
  340        * during updates.
  341        */
  342       private volatile int   poolSize;
  343   
  344       /**
  345        * Lifecycle state
  346        */
  347       volatile int runState;
  348   
  349       // Special values for runState
  350       /** Normal, not-shutdown mode */
  351       static final int RUNNING    = 0;
  352       /** Controlled shutdown mode */
  353       static final int SHUTDOWN   = 1;
  354       /** Immediate shutdown mode */
  355       static final int STOP       = 2;
  356       /** Final state */
  357       static final int TERMINATED = 3;
  358   
  359       /**
  360        * Handler called when saturated or shutdown in execute.
  361        */
  362       private volatile RejectedExecutionHandler handler;
  363   
  364       /**
  365        * Factory for new threads.
  366        */
  367       private volatile ThreadFactory threadFactory;
  368   
  369       /**
  370        * Tracks largest attained pool size.
  371        */
  372       private int largestPoolSize;
  373   
  374       /**
  375        * Counter for completed tasks. Updated only on termination of
  376        * worker threads.
  377        */
  378       private long completedTaskCount;
  379       
  380       /**
  381        * The default rejected execution handler
  382        */
  383       private static final RejectedExecutionHandler defaultHandler =
  384           new AbortPolicy();
  385   
  386       /**
  387        * Invoke the rejected execution handler for the given command.
  388        */
  389       protected void reject(Runnable command) {
  390           handler.rejectedExecution(command, this);
  391       }
  392   
  393       /**
  394        * Create and return a new thread running firstTask as its first
  395        * task. Call only while holding mainLock
  396        * @param firstTask the task the new thread should run first (or
  397        * null if none)
  398        * @return the new thread
  399        */
  400       private Thread addThread(Runnable firstTask) {
  401           Worker w = new Worker(firstTask);
  402           Thread t = threadFactory.newThread(w);
  403           w.thread = t;
  404           workers.add(w);
  405           int nt = ++poolSize;
  406           if (nt > largestPoolSize)
  407               largestPoolSize = nt;
  408           return t;
  409       }
  410   
  411       /**
  412        * Create and start a new thread running firstTask as its first
  413        * task, only if fewer than corePoolSize threads are running.
  414        * @param firstTask the task the new thread should run first (or
  415        * null if none)
  416        * @return true if successful.
  417        */
  418       private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  419           Thread t = null;
  420           final ReentrantLock mainLock = this.mainLock;
  421           mainLock.lock();
  422           try {
  423               if (poolSize < corePoolSize)
  424                   t = addThread(firstTask);
  425           } finally {
  426               mainLock.unlock();
  427           }
  428           if (t == null)
  429               return false;
  430           t.start();
  431           return true;
  432       }
  433   
  434       /**
  435        * Create and start a new thread only if fewer than maximumPoolSize
  436        * threads are running.  The new thread runs as its first task the
  437        * next task in queue, or if there is none, the given task.
  438        * @param firstTask the task the new thread should run first (or
  439        * null if none)
  440        * @return null on failure, else the first task to be run by new thread.
  441        */
  442       private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
  443           Thread t = null;
  444           Runnable next = null;
  445           final ReentrantLock mainLock = this.mainLock;
  446           mainLock.lock();
  447           try {
  448               if (poolSize < maximumPoolSize) {
  449                   next = workQueue.poll();
  450                   if (next == null)
  451                       next = firstTask;
  452                   t = addThread(next);
  453               }
  454           } finally {
  455               mainLock.unlock();
  456           }
  457           if (t == null)
  458               return null;
  459           t.start();
  460           return next;
  461       }
  462   
  463   
  464       /**
  465        * Get the next task for a worker thread to run.
  466        * @return the task
  467        * @throws InterruptedException if interrupted while waiting for task
  468        */
  469       Runnable getTask() throws InterruptedException {
  470           for (;;) {
  471               switch(runState) {
  472               case RUNNING: {
  473                   if (poolSize <= corePoolSize)   // untimed wait if core
  474                       return workQueue.take();
  475                   
  476                   long timeout = keepAliveTime;
  477                   if (timeout <= 0) // die immediately for 0 timeout
  478                       return null;
  479                   Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
  480                   if (r != null)
  481                       return r;
  482                   if (poolSize > corePoolSize) // timed out
  483                       return null;
  484                   // else, after timeout, pool shrank so shouldn't die, so retry
  485                   break;
  486               }
  487   
  488               case SHUTDOWN: {
  489                   // Help drain queue 
  490                   Runnable r = workQueue.poll();
  491                   if (r != null)
  492                       return r;
  493                       
  494                   // Check if can terminate
  495                   if (workQueue.isEmpty()) {
  496                       interruptIdleWorkers();
  497                       return null;
  498                   }
  499   
  500                   // There could still be delayed tasks in queue.
  501                   // Wait for one, re-checking state upon interruption
  502                   try {
  503                       return workQueue.take();
  504                   } catch(InterruptedException ignore) {}
  505                   break;
  506               }
  507   
  508               case STOP:
  509                   return null;
  510               default:
  511                   assert false; 
  512               }
  513           }
  514       }
  515   
  516       /**
  517        * Wake up all threads that might be waiting for tasks.
  518        */
  519       protected void interruptIdleWorkers() {
  520           final ReentrantLock mainLock = this.mainLock;
  521           mainLock.lock();
  522           try {
  523               for (Worker w : workers)
  524                   w.interruptIfIdle();
  525           } finally {
  526               mainLock.unlock();
  527           }
  528       }
  529   
  530       /**
  531        * Perform bookkeeping for a terminated worker thread.
  532        * @param w the worker
  533        */
  534       void workerDone(Worker w) {
  535           final ReentrantLock mainLock = this.mainLock;
  536           mainLock.lock();
  537           try {
  538               completedTaskCount += w.completedTasks;
  539               workers.remove(w);
  540               if (--poolSize > 0)
  541                   return;
  542   
  543               // Else, this is the last thread. Deal with potential shutdown.
  544   
  545               int state = runState;
  546               assert state != TERMINATED;
  547   
  548               if (state != STOP) {
  549                   // If there are queued tasks but no threads, create
  550                   // replacement.
  551                   Runnable r = workQueue.poll();
  552                   if (r != null) {
  553                       addThread(r).start();
  554                       return;
  555                   }
  556   
  557                   // If there are some (presumably delayed) tasks but
  558                   // none pollable, create an idle replacement to wait.
  559                   if (!workQueue.isEmpty()) { 
  560                       addThread(null).start();
  561                       return;
  562                   }
  563   
  564                   // Otherwise, we can exit without replacement
  565                   if (state == RUNNING)
  566                       return;
  567               }
  568   
  569               // Either state is STOP, or state is SHUTDOWN and there is
  570               // no work to do. So we can terminate.
  571               termination.signalAll();
  572               runState = TERMINATED;
  573               // fall through to call terminate() outside of lock.
  574           } finally {
  575               mainLock.unlock();
  576           }
  577   
  578           assert runState == TERMINATED;
  579           terminated(); 
  580       }
  581   
  582       /**
  583        *  Worker threads
  584        */
  585       private class Worker implements Runnable {
  586   
  587           /**
  588            * The runLock is acquired and released surrounding each task
  589            * execution. It mainly protects against interrupts that are
  590            * intended to cancel the worker thread from instead
  591            * interrupting the task being run.
  592            */
  593           private final ReentrantLock runLock = new ReentrantLock();
  594   
  595           /**
  596            * Initial task to run before entering run loop
  597            */
  598           private Runnable firstTask;
  599   
  600           /**
  601            * Per thread completed task counter; accumulated
  602            * into completedTaskCount upon termination.
  603            */
  604           volatile long completedTasks;
  605   
  606           /**
  607            * Thread this worker is running in.  Acts as a final field,
  608            * but cannot be set until thread is created.
  609            */
  610           Thread thread;
  611   
  612           Worker(Runnable firstTask) {
  613               this.firstTask = firstTask;
  614           }
  615   
  616           boolean isActive() {
  617               return runLock.isLocked();
  618           }
  619   
  620           /**
  621            * Interrupt thread if not running a task
  622            */
  623           void interruptIfIdle() {
  624               final ReentrantLock runLock = this.runLock;
  625               if (runLock.tryLock()) {
  626                   try {
  627                       thread.interrupt();
  628                   } finally {
  629                       runLock.unlock();
  630                   }
  631               }
  632           }
  633   
  634           /**
  635            * Cause thread to die even if running a task.
  636            */
  637           void interruptNow() {
  638               thread.interrupt();
  639           }
  640   
  641           /**
  642            * Run a single task between before/after methods.
  643            */
  644           private void runTask(Runnable task) {
  645               final ReentrantLock runLock = this.runLock;
  646               runLock.lock();
  647               try {
  648                   // Abort now if immediate cancel.  Otherwise, we have
  649                   // committed to run this task.
  650                   if (runState == STOP)
  651                       return;
  652   
  653                   Thread.interrupted(); // clear interrupt status on entry
  654                   boolean ran = false;
  655                   beforeExecute(thread, task);
  656                   try {
  657                       task.run();
  658                       ran = true;
  659                       afterExecute(task, null);
  660                       ++completedTasks;
  661                   } catch(RuntimeException ex) {
  662                       if (!ran)
  663                           afterExecute(task, ex);
  664                       // Else the exception occurred within
  665                       // afterExecute itself in which case we don't
  666                       // want to call it again.
  667                       throw ex;
  668                   }
  669               } finally {
  670                   runLock.unlock();
  671               }
  672           }
  673   
  674           /**
  675            * Main run loop
  676            */
  677           public void run() {
  678               try {
  679                   Runnable task = firstTask;
  680                   firstTask = null;
  681                   while (task != null || (task = getTask()) != null) {
  682                       runTask(task);
  683                       task = null; // unnecessary but can help GC
  684                   }
  685               } catch(InterruptedException ie) {
  686                   // fall through
  687               } finally {
  688                   workerDone(this);
  689               }
  690           }
  691       }
  692   
  693       // Public methods
  694   
  695       /**
  696        * Creates a new <tt>ThreadPoolExecutor</tt> with the given
  697        * initial parameters and default thread factory and handler.  It
  698        * may be more convenient to use one of the {@link Executors}
  699        * factory methods instead of this general purpose constructor.
  700        *
  701        * @param corePoolSize the number of threads to keep in the
  702        * pool, even if they are idle.
  703        * @param maximumPoolSize the maximum number of threads to allow in the
  704        * pool.
  705        * @param keepAliveTime when the number of threads is greater than
  706        * the core, this is the maximum time that excess idle threads
  707        * will wait for new tasks before terminating.
  708        * @param unit the time unit for the keepAliveTime
  709        * argument.
  710        * @param workQueue the queue to use for holding tasks before they
  711        * are executed. This queue will hold only the <tt>Runnable</tt>
  712        * tasks submitted by the <tt>execute</tt> method.
  713        * @throws IllegalArgumentException if corePoolSize, or
  714        * keepAliveTime less than zero, or if maximumPoolSize less than or
  715        * equal to zero, or if corePoolSize greater than maximumPoolSize.
  716        * @throws NullPointerException if <tt>workQueue</tt> is null
  717        */
  718       public ThreadPoolExecutor(int corePoolSize,
  719                                 int maximumPoolSize,
  720                                 long keepAliveTime,
  721                                 TimeUnit unit,
  722                                 BlockingQueue<Runnable> workQueue) {
  723           this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  724                Executors.defaultThreadFactory(), defaultHandler);
  725       }
  726   
  727       /**
  728        * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
  729        * parameters.
  730        *
  731        * @param corePoolSize the number of threads to keep in the
  732        * pool, even if they are idle.
  733        * @param maximumPoolSize the maximum number of threads to allow in the
  734        * pool.
  735        * @param keepAliveTime when the number of threads is greater than
  736        * the core, this is the maximum time that excess idle threads
  737        * will wait for new tasks before terminating.
  738        * @param unit the time unit for the keepAliveTime
  739        * argument.
  740        * @param workQueue the queue to use for holding tasks before they
  741        * are executed. This queue will hold only the <tt>Runnable</tt>
  742        * tasks submitted by the <tt>execute</tt> method.
  743        * @param threadFactory the factory to use when the executor
  744        * creates a new thread.
  745        * @throws IllegalArgumentException if corePoolSize, or
  746        * keepAliveTime less than zero, or if maximumPoolSize less than or
  747        * equal to zero, or if corePoolSize greater than maximumPoolSize.
  748        * @throws NullPointerException if <tt>workQueue</tt>
  749        * or <tt>threadFactory</tt> are null.
  750        */
  751       public ThreadPoolExecutor(int corePoolSize,
  752                                 int maximumPoolSize,
  753                                 long keepAliveTime,
  754                                 TimeUnit unit,
  755                                 BlockingQueue<Runnable> workQueue,
  756                                 ThreadFactory threadFactory) {
  757           this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  758                threadFactory, defaultHandler);
  759       }
  760   
  761       /**
  762        * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
  763        * parameters.
  764        *
  765        * @param corePoolSize the number of threads to keep in the
  766        * pool, even if they are idle.
  767        * @param maximumPoolSize the maximum number of threads to allow in the
  768        * pool.
  769        * @param keepAliveTime when the number of threads is greater than
  770        * the core, this is the maximum time that excess idle threads
  771        * will wait for new tasks before terminating.
  772        * @param unit the time unit for the keepAliveTime
  773        * argument.
  774        * @param workQueue the queue to use for holding tasks before they
  775        * are executed. This queue will hold only the <tt>Runnable</tt>
  776        * tasks submitted by the <tt>execute</tt> method.
  777        * @param handler the handler to use when execution is blocked
  778        * because the thread bounds and queue capacities are reached.
  779        * @throws IllegalArgumentException if corePoolSize, or
  780        * keepAliveTime less than zero, or if maximumPoolSize less than or
  781        * equal to zero, or if corePoolSize greater than maximumPoolSize.
  782        * @throws NullPointerException if <tt>workQueue</tt>
  783        * or  <tt>handler</tt> are null.
  784        */
  785       public ThreadPoolExecutor(int corePoolSize,
  786                                 int maximumPoolSize,
  787                                 long keepAliveTime,
  788                                 TimeUnit unit,
  789                                 BlockingQueue<Runnable> workQueue,
  790                                 RejectedExecutionHandler handler) {
  791           this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  792                Executors.defaultThreadFactory(), handler);
  793       }
  794   
  795       /**
  796        * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
  797        * parameters.
  798        *
  799        * @param corePoolSize the number of threads to keep in the
  800        * pool, even if they are idle.
  801        * @param maximumPoolSize the maximum number of threads to allow in the
  802        * pool.
  803        * @param keepAliveTime when the number of threads is greater than
  804        * the core, this is the maximum time that excess idle threads
  805        * will wait for new tasks before terminating.
  806        * @param unit the time unit for the keepAliveTime
  807        * argument.
  808        * @param workQueue the queue to use for holding tasks before they
  809        * are executed. This queue will hold only the <tt>Runnable</tt>
  810        * tasks submitted by the <tt>execute</tt> method.
  811        * @param threadFactory the factory to use when the executor
  812        * creates a new thread.
  813        * @param handler the handler to use when execution is blocked
  814        * because the thread bounds and queue capacities are reached.
  815        * @throws IllegalArgumentException if corePoolSize, or
  816        * keepAliveTime less than zero, or if maximumPoolSize less than or
  817        * equal to zero, or if corePoolSize greater than maximumPoolSize.
  818        * @throws NullPointerException if <tt>workQueue</tt>
  819        * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
  820        */
  821       public ThreadPoolExecutor(int corePoolSize,
  822                                 int maximumPoolSize,
  823                                 long keepAliveTime,
  824                                 TimeUnit unit,
  825                                 BlockingQueue<Runnable> workQueue,
  826                                 ThreadFactory threadFactory,
  827                                 RejectedExecutionHandler handler) {
  828           if (corePoolSize < 0 ||
  829               maximumPoolSize <= 0 ||
  830               maximumPoolSize < corePoolSize ||
  831               keepAliveTime < 0)
  832               throw new IllegalArgumentException();
  833           if (workQueue == null || threadFactory == null || handler == null)
  834               throw new NullPointerException();
  835           this.corePoolSize = corePoolSize;
  836           this.maximumPoolSize = maximumPoolSize;
  837           this.workQueue = workQueue;
  838           this.keepAliveTime = unit.toNanos(keepAliveTime);
  839           this.threadFactory = threadFactory;
  840           this.handler = handler;
  841       }
  842   
  843   
  844       /**
  845        * Executes the given task sometime in the future.  The task
  846        * may execute in a new thread or in an existing pooled thread.
  847        *
  848        * If the task cannot be submitted for execution, either because this
  849        * executor has been shutdown or because its capacity has been reached,
  850        * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
  851        *
  852        * @param command the task to execute
  853        * @throws RejectedExecutionException at discretion of
  854        * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
  855        * for execution
  856        * @throws NullPointerException if command is null
  857        */
  858       public void execute(Runnable command) {
  859           if (command == null)
  860               throw new NullPointerException();
  861           for (;;) {
  862               if (runState != RUNNING) {
  863                   reject(command);
  864                   return;
  865               }
  866               if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
  867                   return;
  868               if (workQueue.offer(command))
  869                   return;
  870               Runnable r = addIfUnderMaximumPoolSize(command);
  871               if (r == command)
  872                   return;
  873               if (r == null) {
  874                   reject(command);
  875                   return;
  876               }
  877               // else retry
  878           }
  879       }
  880   
  881       /**
  882        * Initiates an orderly shutdown in which previously submitted
  883        * tasks are executed, but no new tasks will be
  884        * accepted. Invocation has no additional effect if already shut
  885        * down.
  886        * @throws SecurityException if a security manager exists and
  887        * shutting down this ExecutorService may manipulate threads that
  888        * the caller is not permitted to modify because it does not hold
  889        * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
  890        * or the security manager's <tt>checkAccess</tt>  method denies access.
  891        */
  892       public void shutdown() {
  893           // Fail if caller doesn't have modifyThread permission
  894   	SecurityManager security = System.getSecurityManager();
  895   	if (security != null) 
  896               java.security.AccessController.checkPermission(shutdownPerm);
  897   
  898           boolean fullyTerminated = false;
  899           final ReentrantLock mainLock = this.mainLock;
  900           mainLock.lock();
  901           try {
  902               if (workers.size() > 0) {
  903                   // Check if caller can modify worker threads.  This
  904                   // might not be true even if passed above check, if
  905                   // the SecurityManager treats some threads specially.
  906                   if (security != null) {
  907                       for (Worker w: workers)
  908                           security.checkAccess(w.thread);
  909                   }
  910   
  911                   int state = runState;
  912                   if (state == RUNNING) // don't override shutdownNow
  913                       runState = SHUTDOWN;
  914   
  915                   try {
  916                       for (Worker w: workers)
  917                           w.interruptIfIdle();
  918                   } catch(SecurityException se) {
  919                       // If SecurityManager allows above checks, but
  920                       // then unexpectedly throws exception when
  921                       // interrupting threads (which it ought not do),
  922                       // back out as cleanly as we can. Some threads may
  923                       // have been killed but we remain in non-shutdown
  924                       // state.
  925                       runState = state; 
  926                       throw se;
  927                   }
  928               }
  929               else { // If no workers, trigger full termination now
  930                   fullyTerminated = true;
  931                   runState = TERMINATED;
  932                   termination.signalAll();
  933               }
  934           } finally {
  935               mainLock.unlock();
  936           }
  937           if (fullyTerminated)
  938               terminated();
  939       }
  940   
  941   
  942       /**
  943        * Attempts to stop all actively executing tasks, halts the
  944        * processing of waiting tasks, and returns a list of the tasks that were
  945        * awaiting execution. 
  946        *  
  947        * <p>This implementation cancels tasks via {@link
  948        * Thread#interrupt}, so if any tasks mask or fail to respond to
  949        * interrupts, they may never terminate.
  950        *
  951        * @return list of tasks that never commenced execution
  952        * @throws SecurityException if a security manager exists and
  953        * shutting down this ExecutorService may manipulate threads that
  954        * the caller is not permitted to modify because it does not hold
  955        * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
  956        * or the security manager's <tt>checkAccess</tt> method denies access.
  957        */
  958       public List<Runnable> shutdownNow() {
  959           // Almost the same code as shutdown()
  960   	SecurityManager security = System.getSecurityManager();
  961   	if (security != null) 
  962               java.security.AccessController.checkPermission(shutdownPerm);
  963   
  964           boolean fullyTerminated = false;
  965           final ReentrantLock mainLock = this.mainLock;
  966           mainLock.lock();
  967           try {
  968               if (workers.size() > 0) {
  969                   if (security != null) {
  970                       for (Worker w: workers)
  971                           security.checkAccess(w.thread);
  972                   }
  973   
  974                   int state = runState;
  975                   if (state != TERMINATED)
  976                       runState = STOP;
  977                   try {
  978                       for (Worker w : workers)
  979                           w.interruptNow();
  980                   } catch(SecurityException se) {
  981                       runState = state; // back out;
  982                       throw se;
  983                   }
  984               }
  985               else { // If no workers, trigger full termination now
  986                   fullyTerminated = true;
  987                   runState = TERMINATED;
  988                   termination.signalAll();
  989               }
  990           } finally {
  991               mainLock.unlock();
  992           }
  993           if (fullyTerminated)
  994               terminated();
  995           return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
  996       }
  997   
  998       public boolean isShutdown() {
  999           return runState != RUNNING;
 1000       }
 1001   
 1002       /** 
 1003        * Returns true if this executor is in the process of terminating
 1004        * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
 1005        * completely terminated.  This method may be useful for
 1006        * debugging. A return of <tt>true</tt> reported a sufficient
 1007        * period after shutdown may indicate that submitted tasks have
 1008        * ignored or suppressed interruption, causing this executor not
 1009        * to properly terminate.
 1010        * @return true if terminating but not yet terminated.
 1011        */
 1012       public boolean isTerminating() {
 1013           return runState == STOP;
 1014       }
 1015   
 1016       public boolean isTerminated() {
 1017           return runState == TERMINATED;
 1018       }
 1019   
 1020       public boolean awaitTermination(long timeout, TimeUnit unit)
 1021           throws InterruptedException {
 1022           long nanos = unit.toNanos(timeout);
 1023           final ReentrantLock mainLock = this.mainLock;
 1024           mainLock.lock();
 1025           try {
 1026               for (;;) {
 1027                   if (runState == TERMINATED) 
 1028                       return true;
 1029                   if (nanos <= 0)
 1030                       return false;
 1031                   nanos = termination.awaitNanos(nanos);
 1032               }
 1033           } finally {
 1034               mainLock.unlock();
 1035           }
 1036       }
 1037   
 1038       /**
 1039        * Invokes <tt>shutdown</tt> when this executor is no longer
 1040        * referenced.
 1041        */ 
 1042       protected void finalize()  {
 1043           shutdown();
 1044       }
 1045   
 1046       /**
 1047        * Sets the thread factory used to create new threads.
 1048        *
 1049        * @param threadFactory the new thread factory
 1050        * @throws NullPointerException if threadFactory is null
 1051        * @see #getThreadFactory
 1052        */
 1053       public void setThreadFactory(ThreadFactory threadFactory) {
 1054           if (threadFactory == null)
 1055               throw new NullPointerException();
 1056           this.threadFactory = threadFactory;
 1057       }
 1058   
 1059       /**
 1060        * Returns the thread factory used to create new threads.
 1061        *
 1062        * @return the current thread factory
 1063        * @see #setThreadFactory
 1064        */
 1065       public ThreadFactory getThreadFactory() {
 1066           return threadFactory;
 1067       }
 1068   
 1069       /**
 1070        * Sets a new handler for unexecutable tasks.
 1071        *
 1072        * @param handler the new handler
 1073        * @throws NullPointerException if handler is null
 1074        * @see #getRejectedExecutionHandler
 1075        */
 1076       public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
 1077           if (handler == null)
 1078               throw new NullPointerException();
 1079           this.handler = handler;
 1080       }
 1081   
 1082       /**
 1083        * Returns the current handler for unexecutable tasks.
 1084        *
 1085        * @return the current handler
 1086        * @see #setRejectedExecutionHandler
 1087        */
 1088       public RejectedExecutionHandler getRejectedExecutionHandler() {
 1089           return handler;
 1090       }
 1091   
 1092       /**
 1093        * Returns the task queue used by this executor. Access to the
 1094        * task queue is intended primarily for debugging and monitoring.
 1095        * This queue may be in active use.  Retrieving the task queue
 1096        * does not prevent queued tasks from executing.
 1097        *
 1098        * @return the task queue
 1099        */
 1100       public BlockingQueue<Runnable> getQueue() {
 1101           return workQueue;
 1102       }
 1103   
 1104       /**
 1105        * Removes this task from the executor's internal queue if it is
 1106        * present, thus causing it not to be run if it has not already
 1107        * started.
 1108        * 
 1109        * <p> This method may be useful as one part of a cancellation
 1110        * scheme.  It may fail to remove tasks that have been converted
 1111        * into other forms before being placed on the internal queue. For
 1112        * example, a task entered using <tt>submit</tt> might be
 1113        * converted into a form that maintains <tt>Future</tt> status.
 1114        * However, in such cases, method {@link ThreadPoolExecutor#purge}
 1115        * may be used to remove those Futures that have been cancelled.
 1116        * 
 1117        *
 1118        * @param task the task to remove
 1119        * @return true if the task was removed
 1120        */
 1121       public boolean remove(Runnable task) {
 1122           return getQueue().remove(task);
 1123       }
 1124   
 1125   
 1126       /**
 1127        * Tries to remove from the work queue all {@link Future}
 1128        * tasks that have been cancelled. This method can be useful as a
 1129        * storage reclamation operation, that has no other impact on
 1130        * functionality. Cancelled tasks are never executed, but may
 1131        * accumulate in work queues until worker threads can actively
 1132        * remove them. Invoking this method instead tries to remove them now.
 1133        * However, this method may fail to remove tasks in
 1134        * the presence of interference by other threads.
 1135        */
 1136       public void purge() {
 1137           // Fail if we encounter interference during traversal
 1138           try {
 1139               Iterator<Runnable> it = getQueue().iterator();
 1140               while (it.hasNext()) {
 1141                   Runnable r = it.next();
 1142                   if (r instanceof Future<?>) {
 1143                       Future<?> c = (Future<?>)r;
 1144                       if (c.isCancelled())
 1145                           it.remove();
 1146                   }
 1147               }
 1148           }
 1149           catch(ConcurrentModificationException ex) {
 1150               return; 
 1151           }
 1152       }
 1153   
 1154       /**
 1155        * Sets the core number of threads.  This overrides any value set
 1156        * in the constructor.  If the new value is smaller than the
 1157        * current value, excess existing threads will be terminated when
 1158        * they next become idle. If larger, new threads will, if needed,
 1159        * be started to execute any queued tasks.
 1160        *
 1161        * @param corePoolSize the new core size
 1162        * @throws IllegalArgumentException if <tt>corePoolSize</tt>
 1163        * less than zero
 1164        * @see #getCorePoolSize
 1165        */
 1166       public void setCorePoolSize(int corePoolSize) {
 1167           if (corePoolSize < 0)
 1168               throw new IllegalArgumentException();
 1169           final ReentrantLock mainLock = this.mainLock;
 1170           mainLock.lock();
 1171           try {
 1172               int extra = this.corePoolSize - corePoolSize;
 1173               this.corePoolSize = corePoolSize;
 1174               if (extra < 0) {
 1175                   Runnable r;
 1176                   while (extra++ < 0 && poolSize < corePoolSize &&
 1177                          (r = workQueue.poll()) != null)
 1178                       addThread(r).start();
 1179               }
 1180               else if (extra > 0 && poolSize > corePoolSize) {
 1181                   Iterator<Worker> it = workers.iterator();
 1182                   while (it.hasNext() &&
 1183                          extra-- > 0 &&
 1184                          poolSize > corePoolSize &&
 1185                          workQueue.remainingCapacity() == 0) 
 1186                       it.next().interruptIfIdle();
 1187               }
 1188           } finally {
 1189               mainLock.unlock();
 1190           }
 1191       }
 1192   
 1193       /**
 1194        * Returns the core number of threads.
 1195        *
 1196        * @return the core number of threads
 1197        * @see #setCorePoolSize
 1198        */
 1199       public int getCorePoolSize() {
 1200           return corePoolSize;
 1201       }
 1202   
 1203       /**
 1204        * Starts a core thread, causing it to idly wait for work. This
 1205        * overrides the default policy of starting core threads only when
 1206        * new tasks are executed. This method will return <tt>false</tt>
 1207        * if all core threads have already been started.
 1208        * @return true if a thread was started
 1209        */ 
 1210       public boolean prestartCoreThread() {
 1211           return addIfUnderCorePoolSize(null);
 1212       }
 1213   
 1214       /**
 1215        * Starts all core threads, causing them to idly wait for work. This
 1216        * overrides the default policy of starting core threads only when
 1217        * new tasks are executed. 
 1218        * @return the number of threads started.
 1219        */ 
 1220       public int prestartAllCoreThreads() {
 1221           int n = 0;
 1222           while (addIfUnderCorePoolSize(null))
 1223               ++n;
 1224           return n;
 1225       }
 1226   
 1227       /**
 1228        * Sets the maximum allowed number of threads. This overrides any
 1229        * value set in the constructor. If the new value is smaller than
 1230        * the current value, excess existing threads will be
 1231        * terminated when they next become idle.
 1232        *
 1233        * @param maximumPoolSize the new maximum
 1234        * @throws IllegalArgumentException if maximumPoolSize less than zero or
 1235        * the {@link #getCorePoolSize core pool size}
 1236        * @see #getMaximumPoolSize
 1237        */
 1238       public void setMaximumPoolSize(int maximumPoolSize) {
 1239           if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
 1240               throw new IllegalArgumentException();
 1241           final ReentrantLock mainLock = this.mainLock;
 1242           mainLock.lock();
 1243           try {
 1244               int extra = this.maximumPoolSize - maximumPoolSize;
 1245               this.maximumPoolSize = maximumPoolSize;
 1246               if (extra > 0 && poolSize > maximumPoolSize) {
 1247                   Iterator<Worker> it = workers.iterator();
 1248                   while (it.hasNext() &&
 1249                          extra > 0 &&
 1250                          poolSize > maximumPoolSize) {
 1251                       it.next().interruptIfIdle();
 1252                       --extra;
 1253                   }
 1254               }
 1255           } finally {
 1256               mainLock.unlock();
 1257           }
 1258       }
 1259   
 1260       /**
 1261        * Returns the maximum allowed number of threads.
 1262        *
 1263        * @return the maximum allowed number of threads
 1264        * @see #setMaximumPoolSize
 1265        */
 1266       public int getMaximumPoolSize() {
 1267           return maximumPoolSize;
 1268       }
 1269   
 1270       /**
 1271        * Sets the time limit for which threads may remain idle before
 1272        * being terminated.  If there are more than the core number of
 1273        * threads currently in the pool, after waiting this amount of
 1274        * time without processing a task, excess threads will be
 1275        * terminated.  This overrides any value set in the constructor.
 1276        * @param time the time to wait.  A time value of zero will cause
 1277        * excess threads to terminate immediately after executing tasks.
 1278        * @param unit  the time unit of the time argument
 1279        * @throws IllegalArgumentException if time less than zero
 1280        * @see #getKeepAliveTime
 1281        */
 1282       public void setKeepAliveTime(long time, TimeUnit unit) {
 1283           if (time < 0)
 1284               throw new IllegalArgumentException();
 1285           this.keepAliveTime = unit.toNanos(time);
 1286       }
 1287   
 1288       /**
 1289        * Returns the thread keep-alive time, which is the amount of time
 1290        * which threads in excess of the core pool size may remain
 1291        * idle before being terminated.
 1292        *
 1293        * @param unit the desired time unit of the result
 1294        * @return the time limit
 1295        * @see #setKeepAliveTime
 1296        */
 1297       public long getKeepAliveTime(TimeUnit unit) {
 1298           return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
 1299       }
 1300   
 1301       /* Statistics */
 1302   
 1303       /**
 1304        * Returns the current number of threads in the pool.
 1305        *
 1306        * @return the number of threads
 1307        */
 1308       public int getPoolSize() {
 1309           return poolSize;
 1310       }
 1311   
 1312       /**
 1313        * Returns the approximate number of threads that are actively
 1314        * executing tasks.
 1315        *
 1316        * @return the number of threads
 1317        */
 1318       public int getActiveCount() {
 1319           final ReentrantLock mainLock = this.mainLock;
 1320           mainLock.lock();
 1321           try {
 1322               int n = 0;
 1323               for (Worker w : workers) {
 1324                   if (w.isActive())
 1325                       ++n;
 1326               }
 1327               return n;
 1328           } finally {
 1329               mainLock.unlock();
 1330           }
 1331       }
 1332   
 1333       /**
 1334        * Returns the largest number of threads that have ever
 1335        * simultaneously been in the pool.
 1336        *
 1337        * @return the number of threads
 1338        */
 1339       public int getLargestPoolSize() {
 1340           final ReentrantLock mainLock = this.mainLock;
 1341           mainLock.lock();
 1342           try {
 1343               return largestPoolSize;
 1344           } finally {
 1345               mainLock.unlock();
 1346           }
 1347       }
 1348   
 1349       /**
 1350        * Returns the approximate total number of tasks that have been
 1351        * scheduled for execution. Because the states of tasks and
 1352        * threads may change dynamically during computation, the returned
 1353        * value is only an approximation, but one that does not ever
 1354        * decrease across successive calls.
 1355        *
 1356        * @return the number of tasks
 1357        */
 1358       public long getTaskCount() {
 1359           final ReentrantLock mainLock = this.mainLock;
 1360           mainLock.lock();
 1361           try {
 1362               long n = completedTaskCount;
 1363               for (Worker w : workers) {
 1364                   n += w.completedTasks;
 1365                   if (w.isActive())
 1366                       ++n;
 1367               }
 1368               return n + workQueue.size();
 1369           } finally {
 1370               mainLock.unlock();
 1371           }
 1372       }
 1373   
 1374       /**
 1375        * Returns the approximate total number of tasks that have
 1376        * completed execution. Because the states of tasks and threads
 1377        * may change dynamically during computation, the returned value
 1378        * is only an approximation, but one that does not ever decrease
 1379        * across successive calls.
 1380        *
 1381        * @return the number of tasks
 1382        */
 1383       public long getCompletedTaskCount() {
 1384           final ReentrantLock mainLock = this.mainLock;
 1385           mainLock.lock();
 1386           try {
 1387               long n = completedTaskCount;
 1388               for (Worker w : workers)
 1389                   n += w.completedTasks;
 1390               return n;
 1391           } finally {
 1392               mainLock.unlock();
 1393           }
 1394       }
 1395   
 1396       /**
 1397        * Method invoked prior to executing the given Runnable in the
 1398        * given thread.  This method is invoked by thread <tt>t</tt> that
 1399        * will execute task <tt>r</tt>, and may be used to re-initialize
 1400        * ThreadLocals, or to perform logging. Note: To properly nest
 1401        * multiple overridings, subclasses should generally invoke
 1402        * <tt>super.beforeExecute</tt> at the end of this method.
 1403        *
 1404        * @param t the thread that will run task r.
 1405        * @param r the task that will be executed.
 1406        */
 1407       protected void beforeExecute(Thread t, Runnable r) { }
 1408   
 1409       /**
 1410        * Method invoked upon completion of execution of the given
 1411        * Runnable.  This method is invoked by the thread that executed
 1412        * the task. If non-null, the Throwable is the uncaught exception
 1413        * that caused execution to terminate abruptly. Note: To properly
 1414        * nest multiple overridings, subclasses should generally invoke
 1415        * <tt>super.afterExecute</tt> at the beginning of this method.
 1416        *
 1417        * @param r the runnable that has completed.
 1418        * @param t the exception that caused termination, or null if
 1419        * execution completed normally.
 1420        */
 1421       protected void afterExecute(Runnable r, Throwable t) { }
 1422   
 1423       /**
 1424        * Method invoked when the Executor has terminated.  Default
 1425        * implementation does nothing. Note: To properly nest multiple
 1426        * overridings, subclasses should generally invoke
 1427        * <tt>super.terminated</tt> within this method.
 1428        */
 1429       protected void terminated() { }
 1430   
 1431       /**
 1432        * A handler for rejected tasks that runs the rejected task
 1433        * directly in the calling thread of the <tt>execute</tt> method,
 1434        * unless the executor has been shut down, in which case the task
 1435        * is discarded.
 1436        */
 1437      public static class CallerRunsPolicy implements RejectedExecutionHandler {
 1438           /**
 1439            * Creates a <tt>CallerRunsPolicy</tt>.
 1440            */
 1441           public CallerRunsPolicy() { }
 1442   
 1443           /**
 1444            * Executes task r in the caller's thread, unless the executor
 1445            * has been shut down, in which case the task is discarded.
 1446            * @param r the runnable task requested to be executed
 1447            * @param e the executor attempting to execute this task
 1448            */
 1449           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 1450               if (!e.isShutdown()) {
 1451                   r.run();
 1452               }
 1453           }
 1454       }
 1455   
 1456       /**
 1457        * A handler for rejected tasks that throws a
 1458        * <tt>RejectedExecutionException</tt>.
 1459        */
 1460       public static class AbortPolicy implements RejectedExecutionHandler {
 1461           /**
 1462            * Creates an <tt>AbortPolicy</tt>.
 1463            */
 1464           public AbortPolicy() { }
 1465   
 1466           /**
 1467            * Always throws RejectedExecutionException.
 1468            * @param r the runnable task requested to be executed
 1469            * @param e the executor attempting to execute this task
 1470            * @throws RejectedExecutionException always.
 1471            */
 1472           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 1473               throw new RejectedExecutionException();
 1474           }
 1475       }
 1476   
 1477       /**
 1478        * A handler for rejected tasks that silently discards the
 1479        * rejected task.
 1480        */
 1481       public static class DiscardPolicy implements RejectedExecutionHandler {
 1482           /**
 1483            * Creates a <tt>DiscardPolicy</tt>.
 1484            */
 1485           public DiscardPolicy() { }
 1486   
 1487           /**
 1488            * Does nothing, which has the effect of discarding task r.
 1489            * @param r the runnable task requested to be executed
 1490            * @param e the executor attempting to execute this task
 1491            */
 1492           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 1493           }
 1494       }
 1495   
 1496       /**
 1497        * A handler for rejected tasks that discards the oldest unhandled
 1498        * request and then retries <tt>execute</tt>, unless the executor
 1499        * is shut down, in which case the task is discarded.
 1500        */
 1501       public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 1502           /**
 1503            * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
 1504            */
 1505           public DiscardOldestPolicy() { }
 1506   
 1507           /**
 1508            * Obtains and ignores the next task that the executor
 1509            * would otherwise execute, if one is immediately available,
 1510            * and then retries execution of task r, unless the executor
 1511            * is shut down, in which case task r is instead discarded.
 1512            * @param r the runnable task requested to be executed
 1513            * @param e the executor attempting to execute this task
 1514            */
 1515           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 1516               if (!e.isShutdown()) {
 1517                   e.getQueue().poll();
 1518                   e.execute(r);
 1519               }
 1520           }
 1521       }
 1522   }

Home » geronimo-2.2-source-release » org.apache.geronimo.concurrent.harmony » [javadoc | source]