1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.transport.base.threads; 21 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 25 import java.util.concurrent; 26 import java.util.concurrent.atomic.AtomicInteger; 27 28 /** 29 * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later. 30 */ 31 public class NativeWorkerPool implements WorkerPool { 32 33 private static final Log log = LogFactory.getLog(NativeWorkerPool.class); 34 35 private final ThreadPoolExecutor executor; 36 private final LinkedBlockingQueue<Runnable> blockingQueue; 37 38 public NativeWorkerPool(int core, int max, int keepAlive, 39 int queueLength, String threadGroupName, String threadGroupId) { 40 41 if (log.isDebugEnabled()) { 42 log.debug("Using native util.concurrent package.."); 43 } 44 blockingQueue = 45 (queueLength == -1 ? new LinkedBlockingQueue<Runnable>() 46 : new LinkedBlockingQueue<Runnable>(queueLength)); 47 executor = new ThreadPoolExecutor( 48 core, max, keepAlive, 49 TimeUnit.SECONDS, 50 blockingQueue, 51 new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); 52 } 53 54 public void execute(Runnable task) { 55 executor.execute(task); 56 } 57 58 public int getActiveCount() { 59 return executor.getActiveCount(); 60 } 61 62 public int getQueueSize() { 63 return blockingQueue.size(); 64 } 65 66 public void shutdown(int timeout) throws InterruptedException { 67 executor.shutdown(); 68 executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); 69 } 70 71 /** 72 * This is a simple ThreadFactory implementation using java.util.concurrent 73 * Creates threads with the given name prefix 74 */ 75 public class NativeThreadFactory implements 76 ThreadFactory { 77 78 final ThreadGroup group; 79 final AtomicInteger count; 80 final String namePrefix; 81 82 public NativeThreadFactory(final ThreadGroup group, final String namePrefix) { 83 super(); 84 this.count = new AtomicInteger(1); 85 this.group = group; 86 this.namePrefix = namePrefix; 87 } 88 89 public Thread newThread(final Runnable runnable) { 90 StringBuffer buffer = new StringBuffer(); 91 buffer.append(this.namePrefix); 92 buffer.append('-'); 93 buffer.append(this.count.getAndIncrement()); 94 Thread t = new Thread(group, runnable, buffer.toString(), 0); 95 t.setDaemon(false); 96 t.setPriority(Thread.NORM_PRIORITY); 97 return t; 98 } 99 100 } 101 }