Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.nio;
   18   
   19   import java.io.IOException;
   20   import java.nio.channels.SelectionKey;
   21   import java.nio.channels.Selector;
   22   import java.util.Iterator;
   23   import java.util.Set;
   24   import java.util.concurrent.ConcurrentLinkedQueue;
   25   import java.util.concurrent.atomic.AtomicInteger;
   26   
   27   public class SelectorWorker implements Runnable {
   28   
   29       private static final AtomicInteger NEXT_ID = new AtomicInteger();
   30   
   31       final SelectorManager manager;
   32       final Selector selector;
   33       final int id = NEXT_ID.getAndIncrement();
   34       private final int maxChannelsPerWorker;
   35   
   36       final AtomicInteger retainCounter = new AtomicInteger(1);
   37       private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
   38          
   39       public SelectorWorker(SelectorManager manager) throws IOException {
   40           this.manager = manager;
   41           selector = Selector.open();
   42           maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
   43           manager.getSelectorExecutor().execute(this);
   44       }
   45   
   46       void retain() {
   47           if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
   48               manager.onWorkerFullEvent(this);
   49           }
   50       }
   51   
   52       void release() {
   53           int use = retainCounter.decrementAndGet();
   54           if (use == 0) {
   55               manager.onWorkerEmptyEvent(this);
   56           } else if (use == maxChannelsPerWorker - 1) {
   57               manager.onWorkerNotFullEvent(this);
   58           }
   59       }
   60       
   61       boolean isReleased() {
   62           return retainCounter.get()==0;
   63       }
   64   
   65   
   66       public void addIoTask(Runnable work) {
   67           ioTasks.add(work);
   68           selector.wakeup();
   69       }
   70       
   71       private void processIoTasks() {
   72           Runnable task; 
   73           while( (task= ioTasks.poll()) !=null ) {
   74               try {
   75                   task.run();
   76               } catch (Throwable e) {
   77                   e.printStackTrace();
   78               }
   79           }
   80       }
   81   
   82       
   83   
   84       public void run() {
   85   
   86           String origName = Thread.currentThread().getName();
   87           try {
   88               Thread.currentThread().setName("Selector Worker: " + id);
   89               while (!isReleased()) {
   90               	
   91               	processIoTasks();
   92               	
   93               	int count = selector.select(10);
   94               	
   95                   if (count == 0) {
   96                       continue;
   97                   }
   98   
   99                   // Get a java.util.Set containing the SelectionKey objects
  100                   // for all channels that are ready for I/O.
  101                   Set keys = selector.selectedKeys();
  102   
  103                   for (Iterator i = keys.iterator(); i.hasNext();) {
  104                       final SelectionKey key = (SelectionKey)i.next();
  105                       i.remove();
  106   
  107                       final SelectorSelection s = (SelectorSelection)key.attachment();
  108                       try {
  109                           if( key.isValid() ) {
  110                               key.interestOps(0);
  111                           }
  112   
  113                           // Kick off another thread to find newly selected keys
  114                           // while we process the
  115                           // currently selected keys
  116                           manager.getChannelExecutor().execute(new Runnable() {
  117                               public void run() {
  118                                   try {
  119                                       s.onSelect();
  120                                       s.enable();
  121                                   } catch (Throwable e) {
  122                                       s.onError(e);
  123                                   }
  124                               }
  125                           });
  126   
  127                       } catch (Throwable e) {
  128                           s.onError(e);
  129                       }
  130   
  131                   }
  132   
  133               }
  134           } catch (Throwable e) {         	
  135               e.printStackTrace();
  136               // Notify all the selections that the error occurred.
  137               Set keys = selector.keys();
  138               for (Iterator i = keys.iterator(); i.hasNext();) {
  139                   SelectionKey key = (SelectionKey)i.next();
  140                   SelectorSelection s = (SelectorSelection)key.attachment();
  141                   s.onError(e);
  142               }
  143           } finally {
  144               try {
  145                   manager.onWorkerEmptyEvent(this);
  146                   selector.close();
  147               } catch (IOException ignore) {
  148               	ignore.printStackTrace();
  149               }
  150               Thread.currentThread().setName(origName);
  151           }
  152       }
  153   
  154   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]