1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.transport.nio; 18 19 import java.io.IOException; 20 import java.nio.channels.SocketChannel; 21 import java.util.LinkedList; 22 import java.util.concurrent.Executor; 23 import java.util.concurrent.ExecutorService; 24 import java.util.concurrent.SynchronousQueue; 25 import java.util.concurrent.ThreadFactory; 26 import java.util.concurrent.ThreadPoolExecutor; 27 import java.util.concurrent.TimeUnit; 28 29 /** 30 * The SelectorManager will manage one Selector and the thread that checks the 31 * selector. 32 * 33 * We may need to consider running more than one thread to check the selector if 34 * servicing the selector takes too long. 35 * 36 * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $ 37 */ 38 public final class SelectorManager { 39 40 public static final SelectorManager SINGLETON = new SelectorManager(); 41 42 private Executor selectorExecutor = createDefaultExecutor(); 43 private Executor channelExecutor = selectorExecutor; 44 private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>(); 45 private int maxChannelsPerWorker = 1024; 46 47 protected ExecutorService createDefaultExecutor() { 48 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 49 public Thread newThread(Runnable runnable) { 50 return new Thread(runnable, "ActiveMQ NIO Worker"); 51 } 52 }); 53 // rc.allowCoreThreadTimeOut(true); 54 return rc; 55 } 56 57 public static SelectorManager getInstance() { 58 return SINGLETON; 59 } 60 61 public interface Listener { 62 void onSelect(SelectorSelection selector); 63 64 void onError(SelectorSelection selection, Throwable error); 65 } 66 67 68 public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) 69 throws IOException { 70 71 SelectorSelection selection = null; 72 while( selection == null ) { 73 if (freeWorkers.size() > 0) { 74 SelectorWorker worker = freeWorkers.getFirst(); 75 if( worker.isReleased() ) { 76 freeWorkers.remove(worker); 77 } else { 78 worker.retain(); 79 selection = new SelectorSelection(worker, socketChannel, listener); 80 } 81 82 } else { 83 // Worker starts /w retain count of 1 84 SelectorWorker worker = new SelectorWorker(this); 85 freeWorkers.addFirst(worker); 86 selection = new SelectorSelection(worker, socketChannel, listener); 87 } 88 } 89 90 return selection; 91 } 92 93 synchronized void onWorkerFullEvent(SelectorWorker worker) { 94 freeWorkers.remove(worker); 95 } 96 97 public synchronized void onWorkerEmptyEvent(SelectorWorker worker) { 98 freeWorkers.remove(worker); 99 } 100 101 public synchronized void onWorkerNotFullEvent(SelectorWorker worker) { 102 freeWorkers.addFirst(worker); 103 } 104 105 public Executor getChannelExecutor() { 106 return channelExecutor; 107 } 108 109 public void setChannelExecutor(Executor channelExecutor) { 110 this.channelExecutor = channelExecutor; 111 } 112 113 public int getMaxChannelsPerWorker() { 114 return maxChannelsPerWorker; 115 } 116 117 public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { 118 this.maxChannelsPerWorker = maxChannelsPerWorker; 119 } 120 121 public Executor getSelectorExecutor() { 122 return selectorExecutor; 123 } 124 125 public void setSelectorExecutor(Executor selectorExecutor) { 126 this.selectorExecutor = selectorExecutor; 127 } 128 129 }