Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl;
   18   
   19   import java.io.File;
   20   import java.io.IOException;
   21   import java.io.RandomAccessFile;
   22   import java.nio.channels.FileLock;
   23   import java.util.Date;
   24   import java.util.HashSet;
   25   import java.util.Iterator;
   26   import java.util.Map;
   27   import java.util.Set;
   28   import java.util.concurrent.ConcurrentHashMap;
   29   import java.util.concurrent.atomic.AtomicLong;
   30   
   31   import org.apache.activemq.kaha.ContainerId;
   32   import org.apache.activemq.kaha.ListContainer;
   33   import org.apache.activemq.kaha.MapContainer;
   34   import org.apache.activemq.kaha.Store;
   35   import org.apache.activemq.kaha.StoreLocation;
   36   import org.apache.activemq.kaha.impl.async.AsyncDataManager;
   37   import org.apache.activemq.kaha.impl.async.DataManagerFacade;
   38   import org.apache.activemq.kaha.impl.container.ListContainerImpl;
   39   import org.apache.activemq.kaha.impl.container.MapContainerImpl;
   40   import org.apache.activemq.kaha.impl.data.DataManagerImpl;
   41   import org.apache.activemq.kaha.impl.data.Item;
   42   import org.apache.activemq.kaha.impl.data.RedoListener;
   43   import org.apache.activemq.kaha.impl.index.IndexItem;
   44   import org.apache.activemq.kaha.impl.index.IndexManager;
   45   import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
   46   import org.apache.activemq.util.IOHelper;
   47   import org.apache.commons.logging.Log;
   48   import org.apache.commons.logging.LogFactory;
   49   
   50   /**
   51    * Store Implementation
   52    * 
   53    * @version $Revision: 1.1.1.1 $
   54    */
   55   public class KahaStore implements Store {
   56   
   57       private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
   58       private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
   59                                                                                        + ".FileLockBroken",
   60                                                                                        "false"));
   61       private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
   62                                                                                       + ".DisableLocking",
   63                                                                                       "false"));
   64       //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
   65       //and we can use it as a monitor for the lockset.
   66       private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
   67       private static final Log LOG = LogFactory.getLog(KahaStore.class);
   68   
   69       private final File directory;
   70       private final String mode;
   71       private IndexRootContainer mapsContainer;
   72       private IndexRootContainer listsContainer;
   73       private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>();
   74       private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>();
   75       private final Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>();
   76       private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>();
   77       private boolean closed;
   78       private boolean initialized;
   79       private boolean logIndexChanges;
   80       private boolean useAsyncDataManager;
   81       private long maxDataFileLength = 1024 * 1024 * 32;
   82       private FileLock lock;
   83       private boolean persistentIndex = true;
   84       private RandomAccessFile lockFile;
   85       private final AtomicLong storeSize;
   86       private String defaultContainerName = DEFAULT_CONTAINER_NAME;
   87   
   88       
   89       public KahaStore(String name, String mode) throws IOException {
   90       	this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
   91       }
   92   
   93       public KahaStore(File directory, String mode) throws IOException {
   94       	this(directory, mode, new AtomicLong());
   95       }
   96   
   97       public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
   98       	this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
   99       }
  100       
  101       public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
  102           this.mode = mode;
  103           this.storeSize = storeSize;
  104           this.directory = directory;
  105           IOHelper.mkdirs(this.directory);
  106       }
  107   
  108       public synchronized void close() throws IOException {
  109           if (!closed) {
  110               closed = true;
  111               if (initialized) {
  112                   unlock();
  113                   for (ListContainerImpl container : lists.values()) {
  114                       container.close();
  115                   }
  116                   lists.clear();
  117                   for (MapContainerImpl container : maps.values()) {
  118                       container.close();
  119                   }
  120                   maps.clear();
  121                   for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
  122                       IndexManager im = iter.next();
  123                       im.close();
  124                       iter.remove();
  125                   }
  126                   for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
  127                       DataManager dm = iter.next();
  128                       dm.close();
  129                       iter.remove();
  130                   }
  131               }
  132               if (lockFile!=null) {
  133                   lockFile.close();
  134                   lockFile=null;
  135               }
  136           }
  137       }
  138   
  139       public synchronized void force() throws IOException {
  140           if (initialized) {
  141               for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
  142                   IndexManager im = iter.next();
  143                   im.force();
  144               }
  145               for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
  146                   DataManager dm = iter.next();
  147                   dm.force();
  148               }
  149           }
  150       }
  151   
  152       public synchronized void clear() throws IOException {
  153           initialize();
  154           for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
  155               ContainerId id = (ContainerId)i.next();
  156               MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
  157               container.clear();
  158           }
  159           for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
  160               ContainerId id = (ContainerId)i.next();
  161               ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
  162               container.clear();
  163           }
  164   
  165       }
  166   
  167       public synchronized boolean delete() throws IOException {
  168           boolean result = true;
  169           if (initialized) {
  170               clear();
  171               for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
  172                   IndexManager im = iter.next();
  173                   result &= im.delete();
  174                   iter.remove();
  175               }
  176               for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
  177                   DataManager dm = iter.next();
  178                   result &= dm.delete();
  179                   iter.remove();
  180               }
  181           }
  182           if (directory != null && directory.isDirectory()) {
  183               result =IOHelper.deleteChildren(directory);
  184               String str = result ? "successfully deleted" : "failed to delete";
  185               LOG.info("Kaha Store " + str + " data directory " + directory);
  186           }
  187           return result;
  188       }
  189   
  190       public synchronized boolean isInitialized() {
  191           return initialized;
  192       }
  193   
  194       public boolean doesMapContainerExist(Object id) throws IOException {
  195           return doesMapContainerExist(id, defaultContainerName);
  196       }
  197   
  198       public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
  199           initialize();
  200           ContainerId containerId = new ContainerId(id, containerName);
  201           return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
  202       }
  203   
  204       public MapContainer getMapContainer(Object id) throws IOException {
  205           return getMapContainer(id, defaultContainerName);
  206       }
  207   
  208       public MapContainer getMapContainer(Object id, String containerName) throws IOException {
  209           return getMapContainer(id, containerName, persistentIndex);
  210       }
  211   
  212       public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
  213           throws IOException {
  214           initialize();
  215           ContainerId containerId = new ContainerId(id, containerName);
  216           MapContainerImpl result = maps.get(containerId);
  217           if (result == null) {
  218               DataManager dm = getDataManager(containerName);
  219               IndexManager im = getIndexManager(dm, containerName);
  220   
  221               IndexItem root = mapsContainer.getRoot(im, containerId);
  222               if (root == null) {
  223                   root = mapsContainer.addRoot(im, containerId);
  224               }
  225               result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
  226               maps.put(containerId, result);
  227           }
  228           return result;
  229       }
  230   
  231       public void deleteMapContainer(Object id) throws IOException {
  232           deleteMapContainer(id, defaultContainerName);
  233       }
  234   
  235       public void deleteMapContainer(Object id, String containerName) throws IOException {
  236           ContainerId containerId = new ContainerId(id, containerName);
  237           deleteMapContainer(containerId);
  238       }
  239   
  240       public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
  241           initialize();
  242           MapContainerImpl container = maps.remove(containerId);
  243           if (container != null) {
  244               container.clear();
  245               mapsContainer.removeRoot(container.getIndexManager(), containerId);
  246               container.close();
  247           }
  248       }
  249   
  250       public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
  251           initialize();
  252           Set<ContainerId> set = new HashSet<ContainerId>();
  253           for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
  254               ContainerId id = (ContainerId)i.next();
  255               set.add(id);
  256           }
  257           return set;
  258       }
  259   
  260       public boolean doesListContainerExist(Object id) throws IOException {
  261           return doesListContainerExist(id, defaultContainerName);
  262       }
  263   
  264       public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
  265           initialize();
  266           ContainerId containerId = new ContainerId(id, containerName);
  267           return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
  268       }
  269   
  270       public ListContainer getListContainer(Object id) throws IOException {
  271           return getListContainer(id, defaultContainerName);
  272       }
  273   
  274       public ListContainer getListContainer(Object id, String containerName) throws IOException {
  275           return getListContainer(id, containerName, persistentIndex);
  276       }
  277   
  278       public synchronized ListContainer getListContainer(Object id, String containerName,
  279                                                          boolean persistentIndex) throws IOException {
  280           initialize();
  281           ContainerId containerId = new ContainerId(id, containerName);
  282           ListContainerImpl result = lists.get(containerId);
  283           if (result == null) {
  284               DataManager dm = getDataManager(containerName);
  285               IndexManager im = getIndexManager(dm, containerName);
  286   
  287               IndexItem root = listsContainer.getRoot(im, containerId);
  288               if (root == null) {
  289                   root = listsContainer.addRoot(im, containerId);
  290               }
  291               result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
  292               lists.put(containerId, result);
  293           }
  294           return result;
  295       }
  296   
  297       public void deleteListContainer(Object id) throws IOException {
  298           deleteListContainer(id, defaultContainerName);
  299       }
  300   
  301       public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
  302           ContainerId containerId = new ContainerId(id, containerName);
  303           deleteListContainer(containerId);
  304       }
  305   
  306       public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
  307           initialize();
  308           ListContainerImpl container = lists.remove(containerId);
  309           if (container != null) {
  310               listsContainer.removeRoot(container.getIndexManager(), containerId);
  311               container.clear();
  312               container.close();
  313           }
  314       }
  315   
  316       public synchronized Set<ContainerId> getListContainerIds() throws IOException {
  317           initialize();
  318           Set<ContainerId> set = new HashSet<ContainerId>();
  319           for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
  320               ContainerId id = (ContainerId)i.next();
  321               set.add(id);
  322           }
  323           return set;
  324       }
  325   
  326       /**
  327        * @return the listsContainer
  328        */
  329       public IndexRootContainer getListsContainer() {
  330           return this.listsContainer;
  331       }
  332   
  333       /**
  334        * @return the mapsContainer
  335        */
  336       public IndexRootContainer getMapsContainer() {
  337           return this.mapsContainer;
  338       }
  339   
  340       public synchronized DataManager getDataManager(String name) throws IOException {
  341           DataManager dm = dataManagers.get(name);
  342           if (dm == null) {
  343               if (isUseAsyncDataManager()) {
  344                   AsyncDataManager t = new AsyncDataManager(storeSize);
  345                   t.setDirectory(directory);
  346                   t.setFilePrefix("async-data-" + name + "-");
  347                   t.setMaxFileLength((int)maxDataFileLength);
  348                   t.start();
  349                   dm = new DataManagerFacade(t, name);
  350               } else {
  351                   DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
  352                   t.setMaxFileLength(maxDataFileLength);
  353                   dm = t;
  354               }
  355               if (logIndexChanges) {
  356                   recover(dm);
  357               }
  358               dataManagers.put(name, dm);
  359           }
  360           return dm;
  361       }
  362   
  363       public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
  364           IndexManager im = indexManagers.get(name);
  365           if (im == null) {
  366               im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
  367               indexManagers.put(name, im);
  368           }
  369           return im;
  370       }
  371   
  372       private void recover(final DataManager dm) throws IOException {
  373           dm.recoverRedoItems(new RedoListener() {
  374               public void onRedoItem(StoreLocation item, Object o) throws Exception {
  375                   RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
  376                   // IndexManager im = getIndexManager(dm, redo.getIndexName());
  377                   IndexManager im = getIndexManager(dm, dm.getName());
  378                   im.redo(redo);
  379               }
  380           });
  381       }
  382   
  383       public synchronized boolean isLogIndexChanges() {
  384           return logIndexChanges;
  385       }
  386   
  387       public synchronized void setLogIndexChanges(boolean logIndexChanges) {
  388           this.logIndexChanges = logIndexChanges;
  389       }
  390   
  391       /**
  392        * @return the maxDataFileLength
  393        */
  394       public synchronized long getMaxDataFileLength() {
  395           return maxDataFileLength;
  396       }
  397   
  398       /**
  399        * @param maxDataFileLength the maxDataFileLength to set
  400        */
  401       public synchronized void setMaxDataFileLength(long maxDataFileLength) {
  402           this.maxDataFileLength = maxDataFileLength;
  403       }
  404   
  405       /**
  406        * @return the default index type
  407        */
  408       public synchronized String getIndexTypeAsString() {
  409           return persistentIndex ? "PERSISTENT" : "VM";
  410       }
  411   
  412       /**
  413        * Set the default index type
  414        * 
  415        * @param type "PERSISTENT" or "VM"
  416        */
  417       public synchronized void setIndexTypeAsString(String type) {
  418           if (type.equalsIgnoreCase("VM")) {
  419               persistentIndex = false;
  420           } else {
  421               persistentIndex = true;
  422           }
  423       }
  424       
  425       public boolean isPersistentIndex() {
  426   		return persistentIndex;
  427   	}
  428   
  429   	public void setPersistentIndex(boolean persistentIndex) {
  430   		this.persistentIndex = persistentIndex;
  431   	}
  432   	
  433   
  434       public synchronized boolean isUseAsyncDataManager() {
  435           return useAsyncDataManager;
  436       }
  437   
  438       public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
  439           this.useAsyncDataManager = useAsyncWriter;
  440       }
  441   
  442       /**
  443        * @return size of store
  444        * @see org.apache.activemq.kaha.Store#size()
  445        */
  446       public long size(){
  447           return storeSize.get();
  448       }
  449   
  450       public String getDefaultContainerName() {
  451           return defaultContainerName;
  452       }
  453   
  454       public void setDefaultContainerName(String defaultContainerName) {
  455           this.defaultContainerName = defaultContainerName;
  456       }
  457   
  458       public synchronized void initialize() throws IOException {
  459           if (closed) {
  460               throw new IOException("Store has been closed.");
  461           }
  462           if (!initialized) {       
  463               LOG.info("Kaha Store using data directory " + directory);
  464               lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
  465               lock();
  466               DataManager defaultDM = getDataManager(defaultContainerName);
  467               IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
  468               IndexItem mapRoot = new IndexItem();
  469               IndexItem listRoot = new IndexItem();
  470               if (rootIndexManager.isEmpty()) {
  471                   mapRoot.setOffset(0);
  472                   rootIndexManager.storeIndex(mapRoot);
  473                   listRoot.setOffset(IndexItem.INDEX_SIZE);
  474                   rootIndexManager.storeIndex(listRoot);
  475                   rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
  476               } else {
  477                   mapRoot = rootIndexManager.getIndex(0);
  478                   listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
  479               }
  480               initialized = true;
  481               mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
  482               listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
  483               /**
  484                * Add interest in data files - then consolidate them
  485                */
  486               generateInterestInMapDataFiles();
  487               generateInterestInListDataFiles();
  488               for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
  489                   DataManager dm = i.next();
  490                   dm.consolidateDataFiles();
  491               }
  492           }
  493       }
  494   
  495       private void lock() throws IOException {
  496           synchronized (LOCKSET_MONITOR) {
  497               if (!DISABLE_LOCKING && directory != null && lock == null) {
  498                   String key = getPropertyKey();
  499                   String property = System.getProperty(key);
  500                   if (null == property) {
  501                       if (!BROKEN_FILE_LOCK) {
  502                           lock = lockFile.getChannel().tryLock();
  503                           if (lock == null) {
  504                               throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + "  is already opened by another application");
  505                           } else
  506                               System.setProperty(key, new Date().toString());
  507                       }
  508                   } else { //already locked
  509                       throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
  510                   }
  511               }
  512           }
  513       }
  514   
  515       private void unlock() throws IOException {
  516           synchronized (LOCKSET_MONITOR) {
  517               if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
  518                   System.getProperties().remove(getPropertyKey());
  519                   if (lock.isValid()) {
  520                       lock.release();
  521                   }
  522                   lock = null;
  523               }
  524           }
  525       }
  526   
  527   
  528       private String getPropertyKey() throws IOException {
  529           return getClass().getName() + ".lock." + directory.getCanonicalPath();
  530       }
  531   
  532       /**
  533        * scans the directory and builds up the IndexManager and DataManager
  534        * 
  535        * @throws IOException if there is a problem accessing an index or data file
  536        */
  537       private void generateInterestInListDataFiles() throws IOException {
  538           for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
  539               ContainerId id = (ContainerId)i.next();
  540               DataManager dm = getDataManager(id.getDataContainerName());
  541               IndexManager im = getIndexManager(dm, id.getDataContainerName());
  542               IndexItem theRoot = listsContainer.getRoot(im, id);
  543               long nextItem = theRoot.getNextItem();
  544               while (nextItem != Item.POSITION_NOT_SET) {
  545                   IndexItem item = im.getIndex(nextItem);
  546                   item.setOffset(nextItem);
  547                   dm.addInterestInFile(item.getKeyFile());
  548                   dm.addInterestInFile(item.getValueFile());
  549                   nextItem = item.getNextItem();
  550               }
  551           }
  552       }
  553   
  554       /**
  555        * scans the directory and builds up the IndexManager and DataManager
  556        * 
  557        * @throws IOException if there is a problem accessing an index or data file
  558        */
  559       private void generateInterestInMapDataFiles() throws IOException {
  560           for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
  561               ContainerId id = (ContainerId)i.next();
  562               DataManager dm = getDataManager(id.getDataContainerName());
  563               IndexManager im = getIndexManager(dm, id.getDataContainerName());
  564               IndexItem theRoot = mapsContainer.getRoot(im, id);
  565               long nextItem = theRoot.getNextItem();
  566               while (nextItem != Item.POSITION_NOT_SET) {
  567                   IndexItem item = im.getIndex(nextItem);
  568                   item.setOffset(nextItem);
  569                   dm.addInterestInFile(item.getKeyFile());
  570                   dm.addInterestInFile(item.getValueFile());
  571                   nextItem = item.getNextItem();
  572               }
  573   
  574           }
  575       }
  576   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » [javadoc | source]