Home » openejb-3.1.2-src » org.apache.openejb.server.discovery » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   package org.apache.openejb.server.discovery;
   18   
   19   import org.apache.openejb.server.SelfManaging;
   20   import org.apache.openejb.server.ServerService;
   21   import org.apache.openejb.server.ServiceException;
   22   import org.apache.openejb.server.DiscoveryAgent;
   23   import org.apache.openejb.server.DiscoveryListener;
   24   import org.apache.openejb.util.LogCategory;
   25   import org.apache.openejb.util.Logger;
   26   import org.apache.openejb.loader.Options;
   27   
   28   import java.io.IOException;
   29   import java.io.InputStream;
   30   import java.io.OutputStream;
   31   import java.net.DatagramPacket;
   32   import java.net.InetAddress;
   33   import java.net.InetSocketAddress;
   34   import java.net.MulticastSocket;
   35   import java.net.Socket;
   36   import java.net.SocketAddress;
   37   import java.net.SocketTimeoutException;
   38   import java.net.URI;
   39   import java.net.URISyntaxException;
   40   import java.util.Map;
   41   import java.util.Properties;
   42   import java.util.Timer;
   43   import java.util.TimerTask;
   44   import java.util.concurrent.ConcurrentHashMap;
   45   import java.util.concurrent.Executor;
   46   import java.util.concurrent.LinkedBlockingQueue;
   47   import java.util.concurrent.ThreadFactory;
   48   import java.util.concurrent.ThreadPoolExecutor;
   49   import java.util.concurrent.TimeUnit;
   50   import java.util.concurrent.atomic.AtomicBoolean;
   51   
   52   /**
   53    * @version $Rev$ $Date$
   54    */
   55   public class MulticastDiscoveryAgent implements DiscoveryAgent, ServerService, SelfManaging {
   56   
   57       private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MulticastDiscoveryAgent.class);
   58   
   59       private static final int BUFF_SIZE = 8192;
   60   
   61   
   62       private AtomicBoolean started = new AtomicBoolean(false);
   63       private MulticastSocket multicast;
   64   
   65       private String host = "239.255.3.2";
   66       private int port = 6142;
   67   
   68       private int timeToLive = 1;
   69       private boolean loopbackMode = false;
   70       private SocketAddress address;
   71   
   72       private Map<String, Service> registeredServices = new ConcurrentHashMap<String, Service>();
   73   
   74       private String group = "default";
   75       private String groupPrefix = group + ":";
   76   
   77       private int maxMissedHeartbeats = 10;
   78       private long heartRate = 500;
   79   
   80       private Listener listener;
   81   
   82       public MulticastDiscoveryAgent() {
   83           listener = new Listener();
   84       }
   85   
   86       // ---------------------------------
   87       // Listenting specific settings
   88       private long reconnectDelay = 1000 * 5;
   89       private long maxReconnectDelay = 1000 * 30;
   90       private long exponentialBackoff = 0;
   91       private boolean useExponentialBackOff;
   92       private int maxReconnectAttempts = 10; // todo: check this out
   93       // ---------------------------------
   94   
   95   
   96       public void init(Properties props) throws Exception {
   97   
   98           host = props.getProperty("bind", host);
   99           group = props.getProperty("group", group);
  100           groupPrefix = group + ":";
  101   
  102           Options options = new Options(props);
  103   
  104           port = options.get("port", port);
  105   
  106           heartRate = options.get("heart_rate", heartRate);
  107           maxMissedHeartbeats = options.get("max_missed_heartbeats", maxMissedHeartbeats);
  108           loopbackMode = options.get("loopback_mode", loopbackMode);
  109   
  110           reconnectDelay = options.get("reconnect_delay", reconnectDelay);
  111           maxReconnectDelay = options.get("max_reconnect_delay", reconnectDelay);
  112           maxReconnectAttempts = options.get("max_reconnect_attempts", maxReconnectAttempts);
  113           exponentialBackoff = options.get("exponential_backoff", exponentialBackoff);
  114   
  115           useExponentialBackOff = (exponentialBackoff > 1);
  116       }
  117   
  118       public String getIP() {
  119           return host;
  120       }
  121   
  122       public String getName() {
  123           return "multicast";
  124       }
  125   
  126       public int getPort() {
  127           return port;
  128       }
  129   
  130       public void setDiscoveryListener(DiscoveryListener listener) {
  131           this.listener.setDiscoveryListener(listener);
  132       }
  133   
  134       public void registerService(URI serviceUri) throws IOException {
  135           Service service = new Service(serviceUri);
  136           this.registeredServices.put(service.broadcastString, service);
  137           this.listener.fireServiceAddedEvent(serviceUri);
  138       }
  139   
  140       public void unregisterService(URI serviceUri) throws IOException {
  141           Service service = new Service(serviceUri);
  142           this.registeredServices.remove(service.broadcastString);
  143           this.listener.fireServiceRemovedEvent(serviceUri);
  144       }
  145   
  146       public void reportFailed(URI serviceUri) throws IOException {
  147           listener.reportFailed(serviceUri);
  148       }
  149   
  150   
  151       private boolean isSelf(Service service) {
  152           return isSelf(service.broadcastString);
  153       }
  154   
  155       private boolean isSelf(String service) {
  156           return registeredServices.keySet().contains(service);
  157       }
  158   
  159       public static void main(String[] args) throws Exception {
  160       }
  161   
  162       /**
  163        * start the discovery agent
  164        *
  165        * @throws Exception
  166        */
  167       public void start() throws ServiceException {
  168           try {
  169               if (started.compareAndSet(false, true)) {
  170   
  171                   InetAddress inetAddress = InetAddress.getByName(host);
  172   
  173                   this.address = new InetSocketAddress(inetAddress, port);
  174   
  175                   multicast = new MulticastSocket(port);
  176                   multicast.setLoopbackMode(loopbackMode);
  177                   multicast.setTimeToLive(timeToLive);
  178                   multicast.joinGroup(inetAddress);
  179                   multicast.setSoTimeout((int) heartRate);
  180   
  181                   Thread listenerThread = new Thread(listener);
  182                   listenerThread.setName("MulticastDiscovery: Listener");
  183                   listenerThread.setDaemon(true);
  184                   listenerThread.start();
  185   
  186                   Broadcaster broadcaster = new Broadcaster();
  187   
  188                   Timer timer = new Timer("MulticastDiscovery: Broadcaster", true);
  189                   timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
  190               }
  191           } catch (Exception e) {
  192               throw new ServiceException(e);
  193           }
  194       }
  195   
  196       /**
  197        * stop the channel
  198        *
  199        * @throws Exception
  200        */
  201       public void stop() throws ServiceException {
  202           if (started.compareAndSet(true, false)) {
  203               multicast.close();
  204           }
  205       }
  206   
  207       public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
  208       }
  209   
  210       public void service(Socket socket) throws ServiceException, IOException {
  211       }
  212   
  213       class Service {
  214           private final URI uri;
  215           private final String broadcastString;
  216   
  217           public Service(URI uri) {
  218               this.uri = uri;
  219               this.broadcastString = groupPrefix + uri.toString();
  220           }
  221   
  222           public Service(String uriString) throws URISyntaxException {
  223               URI uri = new URI(uriString);
  224               uri = new URI(uri.getSchemeSpecificPart());
  225               this.uri = uri;
  226               this.broadcastString = uriString;
  227           }
  228       }
  229   
  230       private class ServiceVitals {
  231   
  232           private final Service service;
  233   
  234           private long lastHeartBeat;
  235           private long recoveryTime;
  236           private int failureCount;
  237           private boolean dead;
  238   
  239           public ServiceVitals(Service service) {
  240               this.service = service;
  241               this.lastHeartBeat = System.currentTimeMillis();
  242           }
  243   
  244           public synchronized void heartbeat() {
  245               lastHeartBeat = System.currentTimeMillis();
  246   
  247               // Consider that the service recovery has succeeded if it has not
  248               // failed in 60 seconds.
  249               if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
  250                   if (log.isDebugEnabled()) {
  251                       log.debug("I now think that the " + service + " service has recovered.");
  252                   }
  253                   failureCount = 0;
  254                   recoveryTime = 0;
  255               }
  256           }
  257   
  258           public synchronized long getLastHeartbeat() {
  259               return lastHeartBeat;
  260           }
  261   
  262           public synchronized boolean pronounceDead() {
  263               if (!dead) {
  264                   dead = true;
  265                   failureCount++;
  266   
  267                   long delay;
  268                   if (useExponentialBackOff) {
  269                       delay = (long) Math.pow(exponentialBackoff, failureCount);
  270                       if (delay > maxReconnectDelay) {
  271                           delay = maxReconnectDelay;
  272                       }
  273                   } else {
  274                       delay = reconnectDelay;
  275                   }
  276   
  277                   if (log.isDebugEnabled()) {
  278                       log.debug("Remote failure of " + service + " while still receiving multicast advertisements.  " +
  279                               "Advertising events will be suppressed for " + delay
  280                               + " ms, the current failure count is: " + failureCount);
  281                   }
  282   
  283                   recoveryTime = System.currentTimeMillis() + delay;
  284                   return true;
  285               }
  286               return false;
  287           }
  288   
  289           /**
  290            * @return true if this broker is marked failed and it is now the right
  291            *         time to start recovery.
  292            */
  293           public synchronized boolean doRecovery() {
  294               if (!dead) {
  295                   return false;
  296               }
  297   
  298               // Are we done trying to recover this guy?
  299               if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
  300                   if (log.isDebugEnabled()) {
  301                       log.debug("Max reconnect attempts of the " + service + " service has been reached.");
  302                   }
  303                   return false;
  304               }
  305   
  306               // Is it not yet time?
  307               if (System.currentTimeMillis() < recoveryTime) {
  308                   return false;
  309               }
  310   
  311               if (log.isDebugEnabled()) {
  312                   log.debug("Resuming event advertisement of the " + service + " service.");
  313               }
  314               dead = false;
  315               return true;
  316           }
  317   
  318           public boolean isDead() {
  319               return dead;
  320           }
  321       }
  322   
  323   
  324       class Listener implements Runnable {
  325           private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
  326           private DiscoveryListener discoveryListener;
  327   
  328           public void setDiscoveryListener(DiscoveryListener discoveryListener) {
  329               this.discoveryListener = discoveryListener;
  330           }
  331   
  332           public void run() {
  333               byte[] buf = new byte[BUFF_SIZE];
  334               DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
  335               while (started.get()) {
  336                   checkServices();
  337                   try {
  338                       multicast.receive(packet);
  339                       if (packet.getLength() > 0) {
  340                           String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
  341   //                        System.out.println("read = " + str);
  342                           processData(str);
  343                       }
  344                   } catch (SocketTimeoutException se) {
  345                       // ignore
  346                   } catch (IOException e) {
  347                       if (started.get()) {
  348                           log.error("failed to process packet: " + e);
  349                       }
  350                   }
  351               }
  352           }
  353   
  354           private void processData(String uriString) {
  355               if (discoveryListener == null) {
  356                   return;
  357               }
  358   
  359               if (!uriString.startsWith(groupPrefix)){
  360                   return;
  361               }
  362   
  363               if (isSelf(uriString)) {
  364                   return;
  365               }
  366   
  367               ServiceVitals vitals = discoveredServices.get(uriString);
  368   
  369               if (vitals == null) {
  370                   try {
  371                       vitals = new ServiceVitals(new Service(uriString));
  372   
  373                       discoveredServices.put(uriString, vitals);
  374   
  375                       fireServiceAddedEvent(vitals.service.uri);
  376                   } catch (URISyntaxException e) {
  377                       // don't continuously log this
  378                   }
  379   
  380               } else {
  381                   vitals.heartbeat();
  382   
  383                   if (vitals.doRecovery()) {
  384                       fireServiceAddedEvent(vitals.service.uri);
  385                   }
  386               }
  387           }
  388   
  389           private void checkServices() {
  390               long expireTime = System.currentTimeMillis() - (heartRate * maxMissedHeartbeats);
  391               for (ServiceVitals serviceVitals : discoveredServices.values()) {
  392                   if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
  393   
  394                       ServiceVitals vitals = discoveredServices.remove(serviceVitals.service.broadcastString);
  395                       if (vitals != null && !vitals.isDead()) {
  396                           fireServiceRemovedEvent(vitals.service.uri);
  397                       }
  398                   }
  399               }
  400           }
  401   
  402           private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
  403               public Thread newThread(Runnable runable) {
  404                   Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
  405                   t.setDaemon(true);
  406                   return t;
  407               }
  408           });
  409   
  410           private void fireServiceRemovedEvent(final URI uri) {
  411               if (discoveryListener != null) {
  412                   final DiscoveryListener discoveryListener = this.discoveryListener;
  413   
  414                   // Have the listener process the event async so that
  415                   // he does not block this thread since we are doing time sensitive
  416                   // processing of events.
  417                   executor.execute(new Runnable() {
  418                       public void run() {
  419                           if (discoveryListener != null) {
  420                               discoveryListener.serviceRemoved(uri);
  421                           }
  422                       }
  423                   });
  424               }
  425           }
  426   
  427           private void fireServiceAddedEvent(final URI uri) {
  428               if (discoveryListener != null) {
  429                   final DiscoveryListener discoveryListener = this.discoveryListener;
  430   
  431                   // Have the listener process the event async so that
  432                   // he does not block this thread since we are doing time sensitive
  433                   // processing of events.
  434                   executor.execute(new Runnable() {
  435                       public void run() {
  436                           if (discoveryListener != null) {
  437                               discoveryListener.serviceAdded(uri);
  438                           }
  439                       }
  440                   });
  441               }
  442           }
  443   
  444           public void reportFailed(URI serviceUri) {
  445               final Service service = new Service(serviceUri);
  446               ServiceVitals serviceVitals = discoveredServices.get(service.broadcastString);
  447               if (serviceVitals != null && serviceVitals.pronounceDead()) {
  448                   fireServiceRemovedEvent(service.uri);
  449               }
  450           }
  451       }
  452   
  453       class Broadcaster extends TimerTask {
  454           private IOException failed;
  455   
  456           public void run() {
  457               if (started.get()) {
  458                   heartbeat();
  459               }
  460           }
  461   
  462           private void heartbeat() {
  463               for (String uri : registeredServices.keySet()) {
  464                   try {
  465                       byte[] data = uri.getBytes();
  466                       DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
  467   //                    System.out.println("ann = " + uri);
  468                       multicast.send(packet);
  469                   } catch (IOException e) {
  470                       // If a send fails, chances are all subsequent sends will fail
  471                       // too.. No need to keep reporting the
  472                       // same error over and over.
  473                       if (failed == null) {
  474                           failed = e;
  475   
  476                           log.error("Failed to advertise our service: " + uri, e);
  477                           if ("Operation not permitted".equals(e.getMessage())) {
  478                               log.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
  479                                       + "Please make sure that the OS is properly configured to allow multicast traffic over: " + multicast.getLocalAddress());
  480                           }
  481                       }
  482                   }
  483               }
  484           }
  485       }
  486   
  487   
  488       //
  489       //  Ordinary getters/setters
  490       //
  491   
  492       public long getExponentialBackoff() {
  493           return exponentialBackoff;
  494       }
  495   
  496       public void setExponentialBackoff(long exponentialBackoff) {
  497           this.exponentialBackoff = exponentialBackoff;
  498           this.useExponentialBackOff = (exponentialBackoff > 1);
  499       }
  500   
  501       public String getGroup() {
  502           return group;
  503       }
  504   
  505       public void setGroup(String group) {
  506           this.group = group;
  507           groupPrefix = group + ":";
  508       }
  509   
  510       public long getHeartRate() {
  511           return heartRate;
  512       }
  513   
  514       public void setHeartRate(long heartRate) {
  515           this.heartRate = heartRate;
  516       }
  517   
  518       public String getHost() {
  519           return host;
  520       }
  521   
  522       public void setHost(String host) {
  523           this.host = host;
  524       }
  525   
  526       public long getReconnectDelay() {
  527           return reconnectDelay;
  528       }
  529   
  530       public void setReconnectDelay(long reconnectDelay) {
  531           this.reconnectDelay = reconnectDelay;
  532       }
  533   
  534       public boolean isLoopbackMode() {
  535           return loopbackMode;
  536       }
  537   
  538       public void setLoopbackMode(boolean loopbackMode) {
  539           this.loopbackMode = loopbackMode;
  540       }
  541   
  542       public int getMaxMissedHeartbeats() {
  543           return maxMissedHeartbeats;
  544       }
  545   
  546       public void setMaxMissedHeartbeats(int maxMissedHeartbeats) {
  547           this.maxMissedHeartbeats = maxMissedHeartbeats;
  548       }
  549   
  550       public int getMaxReconnectAttempts() {
  551           return maxReconnectAttempts;
  552       }
  553   
  554       public void setMaxReconnectAttempts(int maxReconnectAttempts) {
  555           this.maxReconnectAttempts = maxReconnectAttempts;
  556       }
  557   
  558       public long getMaxReconnectDelay() {
  559           return maxReconnectDelay;
  560       }
  561   
  562       public void setMaxReconnectDelay(long maxReconnectDelay) {
  563           this.maxReconnectDelay = maxReconnectDelay;
  564       }
  565   
  566       public int getTimeToLive() {
  567           return timeToLive;
  568       }
  569   
  570       public void setTimeToLive(int timeToLive) {
  571           this.timeToLive = timeToLive;
  572       }
  573   
  574   }

Home » openejb-3.1.2-src » org.apache.openejb.server.discovery » [javadoc | source]