Home » geronimo-2.2-source-release » org.apache.geronimo.concurrent.harmony » [javadoc | source]

    1   /*
    2    * Written by Doug Lea with assistance from members of JCP JSR-166
    3    * Expert Group and released to the public domain, as explained at
    4    * http://creativecommons.org/licenses/publicdomain
    5    */
    6   
    7   /*
    8    * Contains the following modifications:
    9    * 1) support for skipped state (for periodic tasks)
   10    * 2) support for taskStart() and taskDone() callbacks 
   11    */
   12   
   13   package org.apache.geronimo.concurrent.harmony;
   14   
   15   import java.util.concurrent.Callable;
   16   import java.util.concurrent.CancellationException;
   17   import java.util.concurrent.ExecutionException;
   18   import java.util.concurrent.Executors;
   19   import java.util.concurrent.Future;
   20   import java.util.concurrent.TimeUnit;
   21   import java.util.concurrent.TimeoutException;
   22   import java.util.concurrent.locks.AbstractQueuedSynchronizer;
   23   
   24   import javax.util.concurrent.SkippedException;
   25   
   26   /**
   27    * A cancellable asynchronous computation.  This class provides a base
   28    * implementation of {@link Future}, with methods to start and cancel
   29    * a computation, query to see if the computation is complete, and
   30    * retrieve the result of the computation.  The result can only be
   31    * retrieved when the computation has completed; the <tt>get</tt>
   32    * method will block if the computation has not yet completed.  Once
   33    * the computation has completed, the computation cannot be restarted
   34    * or cancelled.
   35    *
   36    * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
   37    * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
   38    * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
   39    * submitted to an {@link Executor} for execution.
   40    *
   41    * <p>In addition to serving as a standalone class, this class provides
   42    * <tt>protected</tt> functionality that may be useful when creating
   43    * customized task classes.
   44    *
   45    * @since 1.5
   46    * @author Doug Lea
   47    * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
   48    */
   49   public class FutureTask<V> implements Future<V>, Runnable {
   50       /** Synchronization control for FutureTask */
   51       private final Sync sync;
   52   
   53       /**
   54        * Creates a <tt>FutureTask</tt> that will upon running, execute the
   55        * given <tt>Callable</tt>.
   56        *
   57        * @param  callable the callable task
   58        * @throws NullPointerException if callable is null
   59        */
   60       public FutureTask(Callable<V> callable) {
   61           if (callable == null)
   62               throw new NullPointerException();
   63           sync = new Sync(callable);
   64       }
   65   
   66       /**
   67        * Creates a <tt>FutureTask</tt> that will upon running, execute the
   68        * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
   69        * given result on successful completion.
   70        *
   71        * @param  runnable the runnable task
   72        * @param result the result to return on successful completion. If
   73        * you don't need a particular result, consider using
   74        * constructions of the form:
   75        * <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
   76        * @throws NullPointerException if runnable is null
   77        */
   78       public FutureTask(Runnable runnable, V result) {
   79           sync = new Sync(Executors.callable(runnable, result));
   80       }
   81   
   82       public boolean isCancelled() {
   83           return sync.innerIsCancelled();
   84       }
   85       
   86       public boolean isDone() {
   87           return sync.innerIsDone();
   88       }
   89   
   90       public boolean cancel(boolean mayInterruptIfRunning) {
   91           return sync.innerCancel(mayInterruptIfRunning);
   92       }
   93       
   94       public V get() throws InterruptedException, ExecutionException {
   95           return sync.innerGet();
   96       }
   97   
   98       public V get(long timeout, TimeUnit unit)
   99           throws InterruptedException, ExecutionException, TimeoutException {
  100           return sync.innerGet(unit.toNanos(timeout));
  101       }
  102   
  103       /**
  104        * Protected method invoked when this task transitions to state
  105        * <tt>isDone</tt> (whether normally or via cancellation). The
  106        * default implementation does nothing.  Subclasses may override
  107        * this method to invoke completion callbacks or perform
  108        * bookkeeping. Note that you can query status inside the
  109        * implementation of this method to determine whether this task
  110        * has been cancelled.
  111        */
  112       protected void done() { }
  113   
  114       /**
  115        * Sets the result of this Future to the given value unless
  116        * this future has already been set or has been cancelled.
  117        * @param v the value
  118        */ 
  119       protected void set(V v) {
  120           sync.innerSet(v);
  121       }
  122   
  123       /**
  124        * Causes this future to report an <tt>ExecutionException</tt>
  125        * with the given throwable as its cause, unless this Future has
  126        * already been set or has been cancelled.
  127        * @param t the cause of failure.
  128        */ 
  129       protected void setException(Throwable t) {
  130           sync.innerSetException(t);
  131       }
  132       
  133       /**
  134        * Sets this Future to the result of computation unless
  135        * it has been cancelled.
  136        */
  137       public void run() {
  138           sync.innerRun();
  139       }
  140   
  141       /**
  142        * Executes the computation without setting its result, and then
  143        * resets this Future to initial state, failing to do so if the
  144        * computation encounters an exception or is cancelled.  This is
  145        * designed for use with tasks that intrinsically execute more
  146        * than once.
  147        * @return true if successfully run and reset
  148        */
  149       protected boolean runAndReset() {
  150           return sync.innerRunAndReset();
  151       }
  152   
  153       public boolean isSkipped() {
  154           return sync.innerIsSkipped();
  155       }
  156       
  157       protected boolean setSkipped() {
  158           return sync.innerSetSkipped();
  159       }
  160       
  161       protected void taskStart() {        
  162       }
  163       
  164       protected void taskDone(Throwable exception) {        
  165       }
  166       
  167       /**
  168        * Synchronization control for FutureTask. Note that this must be
  169        * a non-static inner class in order to invoke the protected
  170        * <tt>done</tt> method. For clarity, all inner class support
  171        * methods are same as outer, prefixed with "inner".
  172        *
  173        * Uses AQS sync state to represent run status
  174        */
  175       private final class Sync extends AbstractQueuedSynchronizer {
  176           private static final int READY     = 0;
  177           /** State value representing that task is running */
  178           private static final int RUNNING   = 1;
  179           /** State value representing that task ran */
  180           private static final int RAN       = 2;
  181           /** State value representing that task was cancelled */
  182           private static final int CANCELLED = 4;
  183           
  184           private static final int SKIPPED   = 8;
  185   
  186           /** The underlying callable */
  187           private final Callable<V> callable;
  188           /** The result to return from get() */
  189           private V result;
  190           /** The exception to throw from get() */
  191           private Throwable exception;
  192   
  193           /** 
  194            * The thread running task. When nulled after set/cancel, this
  195            * indicates that the results are accessible.  Must be
  196            * volatile, to serve as write barrier on completion.
  197            */
  198           private volatile Thread runner;
  199   
  200           Sync(Callable<V> callable) {
  201               this.callable = callable;
  202           }
  203   
  204           private boolean ranOrCancelled(int state) {
  205               return (state & (RAN | CANCELLED)) != 0;
  206           }
  207   
  208           /**
  209            * Implements AQS base acquire to succeed if ran or cancelled
  210            */
  211           protected int tryAcquireShared(int ignore) {
  212               return (innerIsDone() || innerIsSkipped()) ? 1 : -1;
  213           }
  214   
  215           /**
  216            * Implements AQS base release to always signal after setting
  217            * final done status by nulling runner thread.
  218            */
  219           protected boolean tryReleaseShared(int ignore) {
  220               runner = null;
  221               return true; 
  222           }
  223   
  224           boolean innerIsCancelled() {
  225               return getState() == CANCELLED;
  226           }
  227           
  228           boolean innerIsDone() {
  229               return ranOrCancelled(getState()) && runner == null;
  230           }
  231   
  232           V innerGet() throws InterruptedException, ExecutionException {
  233               acquireSharedInterruptibly(0);
  234               if (getState() == CANCELLED)
  235                   throw new CancellationException();
  236               if (getState() == SKIPPED) {
  237                   throw new SkippedException();
  238               }
  239               if (exception != null)
  240                   throw new ExecutionException(exception);
  241               return result;
  242           }
  243   
  244           V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
  245               if (!tryAcquireSharedNanos(0, nanosTimeout))
  246                   throw new TimeoutException();                
  247               if (getState() == CANCELLED)
  248                   throw new CancellationException();
  249               if (getState() == SKIPPED) {
  250                   throw new SkippedException();
  251               }
  252               if (exception != null)
  253                   throw new ExecutionException(exception);
  254               return result;
  255           }
  256   
  257           void innerSet(V v) {
  258               for (;;) {
  259                   int s = getState();
  260                   if (ranOrCancelled(s)) {
  261                       return;
  262                   }
  263                   if (compareAndSetState(s, RAN)) {
  264                       break;
  265                   }
  266               }
  267               result = v;
  268               releaseShared(0);
  269               done();
  270           }
  271   
  272           void innerSetException(Throwable t) {
  273               for (;;) {
  274                   int s = getState();
  275                   if (ranOrCancelled(s)) {
  276                       return;
  277                   } 
  278                   if (compareAndSetState(s, RAN)) {
  279                       break;
  280                   }
  281               }            
  282               exception = t;
  283               result = null;
  284               releaseShared(0);
  285               done();
  286           }
  287   
  288           boolean innerCancel(boolean mayInterruptIfRunning) {
  289               for (;;) {
  290                   int s = getState();
  291                   if (ranOrCancelled(s)) {
  292                       return false;
  293                   }
  294                   if (compareAndSetState(s, CANCELLED)) {
  295                       break;
  296                   }
  297               }
  298               if (mayInterruptIfRunning) {
  299                   Thread r = runner;
  300                   if (r != null)
  301                       r.interrupt();
  302               }
  303               releaseShared(0);
  304               done();
  305               return true;
  306           }
  307           
  308           boolean innerIsSkipped() {
  309               return getState() == SKIPPED;
  310           }
  311           
  312           boolean innerSetSkipped() {
  313               for (;;) {
  314                   int s = getState();
  315                   if (ranOrCancelled(s)) {
  316                       return false;
  317                   } 
  318                   if (compareAndSetState(s, SKIPPED)) {
  319                       break;
  320                   }              
  321               }
  322               releaseShared(0);
  323               return true;
  324           }
  325   
  326           void innerRun() {
  327               if (!compareAndSetState(READY, RUNNING)) 
  328                   return;
  329               try {
  330                   taskStart();
  331                   runner = Thread.currentThread();
  332                   V v = callable.call();                
  333                   innerSet(v);       
  334                   taskDone(null);
  335               } catch(Throwable ex) {                
  336                   innerSetException(ex);    
  337                   taskDone(ex);
  338               } 
  339           }
  340   
  341           boolean innerRunAndReset() {
  342               if (!(compareAndSetState(READY, RUNNING) || compareAndSetState(SKIPPED, RUNNING)))  
  343                   return false;
  344               try {
  345                   taskStart();
  346                   runner = Thread.currentThread();
  347                   callable.call(); // don't set result
  348                   runner = null;                
  349                   boolean rs = compareAndSetState(RUNNING, READY);
  350                   taskDone(null);
  351                   return rs;
  352               } catch(Throwable ex) {                
  353                   innerSetException(ex);     
  354                   taskDone(ex);
  355                   return false;
  356               } 
  357           }
  358       }
  359   }

Home » geronimo-2.2-source-release » org.apache.geronimo.concurrent.harmony » [javadoc | source]