Home » activemq-parent-5.3.1-source-release » org.apache.kahadb.index » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.kahadb.index;
   18   
   19   import java.io.DataInput;
   20   import java.io.DataOutput;
   21   import java.io.IOException;
   22   import java.util.Iterator;
   23   import java.util.Map;
   24   import java.util.Map.Entry;
   25   import java.util.concurrent.atomic.AtomicBoolean;
   26   
   27   import org.apache.commons.logging.Log;
   28   import org.apache.commons.logging.LogFactory;
   29   import org.apache.kahadb.page.Page;
   30   import org.apache.kahadb.page.PageFile;
   31   import org.apache.kahadb.page.Transaction;
   32   import org.apache.kahadb.util.Marshaller;
   33   import org.apache.kahadb.util.VariableMarshaller;
   34   
   35   /**
   36    * BTree implementation
   37    * 
   38    * @version $Revision: 777209 $
   39    */
   40   public class HashIndex<Key,Value> implements Index<Key,Value> {
   41   
   42       public static final int CLOSED_STATE = 1;
   43       public static final int OPEN_STATE = 2;
   44   
   45   
   46       private static final Log LOG = LogFactory.getLog(HashIndex.class);
   47   
   48       public static final int DEFAULT_BIN_CAPACITY;
   49       public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
   50       public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
   51       public static final int DEFAULT_LOAD_FACTOR;
   52   
   53       static {
   54           DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
   55           DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
   56           DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
   57           DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
   58       }
   59   
   60       private AtomicBoolean loaded = new AtomicBoolean();
   61   
   62   
   63       private int increaseThreshold;
   64       private int decreaseThreshold;
   65   
   66       // Where the bin page array starts at.
   67       private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
   68       private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
   69   
   70   
   71   
   72       // Once binsActive/binCapacity reaches the loadFactor, then we need to
   73       // increase the capacity
   74       private int loadFactor = DEFAULT_LOAD_FACTOR;
   75   
   76       private PageFile pageFile;
   77       // This page holds the index metadata.
   78       private long pageId;
   79   
   80       static class Metadata {
   81           
   82           private Page<Metadata> page;
   83           
   84           // When the index is initializing or resizing.. state changes so that
   85           // on failure it can be properly recovered.
   86           private int state;
   87           private long binPageId;
   88           private int binCapacity = DEFAULT_BIN_CAPACITY;
   89           private int binsActive;
   90           private int size;
   91   
   92           
   93           public void read(DataInput is) throws IOException {
   94               state = is.readInt();
   95               binPageId = is.readLong();
   96               binCapacity = is.readInt();
   97               size = is.readInt();
   98               binsActive = is.readInt();
   99           }
  100           public void write(DataOutput os) throws IOException {
  101               os.writeInt(state);
  102               os.writeLong(binPageId);
  103               os.writeInt(binCapacity);
  104               os.writeInt(size);
  105               os.writeInt(binsActive);
  106           }
  107           
  108           static class Marshaller extends VariableMarshaller<Metadata> {
  109               public Metadata readPayload(DataInput dataIn) throws IOException {
  110                   Metadata rc = new Metadata();
  111                   rc.read(dataIn);
  112                   return rc;
  113               }
  114   
  115               public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
  116                   object.write(dataOut);
  117               }
  118           }
  119       }
  120       
  121       private Metadata metadata = new Metadata();
  122       
  123       private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller();
  124       private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller<Key,Value>(this);
  125       private Marshaller<Key> keyMarshaller;
  126       private Marshaller<Value> valueMarshaller;
  127   
  128       
  129       /**
  130        * Constructor
  131        * 
  132        * @param directory
  133        * @param name
  134        * @param indexManager
  135        * @param numberOfBins
  136        * @throws IOException
  137        */
  138       public HashIndex(PageFile pageFile, long pageId) throws IOException {
  139           this.pageFile = pageFile;
  140           this.pageId = pageId;
  141       }
  142   
  143       public synchronized void load(Transaction tx) throws IOException {
  144           if (loaded.compareAndSet(false, true)) {
  145               final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
  146               // Is this a brand new index?
  147               if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
  148                   // We need to create the pages for the bins
  149                   Page binPage = tx.allocate(metadata.binCapacity);
  150                   metadata.binPageId = binPage.getPageId();
  151                   metadata.page = metadataPage;
  152                   metadataPage.set(metadata);
  153                   clear(tx);
  154   
  155                   // If failure happens now we can continue initializing the
  156                   // the hash bins...
  157               } else {
  158   
  159                   metadata = metadataPage.get();
  160                   metadata.page = metadataPage;
  161                   
  162                   // If we did not have a clean shutdown...
  163                   if (metadata.state == OPEN_STATE ) {
  164                       // Figure out the size and the # of bins that are
  165                       // active. Yeah This loads the first page of every bin. :(
  166                       // We might want to put this in the metadata page, but
  167                       // then that page would be getting updated on every write.
  168                       metadata.size = 0;
  169                       for (int i = 0; i < metadata.binCapacity; i++) {
  170                           int t = sizeOfBin(tx, i);
  171                           if (t > 0) {
  172                               metadata.binsActive++;
  173                           }
  174                           metadata.size += t;
  175                       }
  176                   }
  177               }
  178   
  179               calcThresholds();
  180   
  181               metadata.state = OPEN_STATE;
  182               tx.store(metadataPage, metadataMarshaller, true);
  183               
  184               LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
  185           }
  186       }
  187   
  188       public synchronized void unload(Transaction tx) throws IOException {
  189           if (loaded.compareAndSet(true, false)) {
  190               metadata.state = CLOSED_STATE;
  191               tx.store(metadata.page, metadataMarshaller, true);
  192           }
  193       }
  194   
  195       private int sizeOfBin(Transaction tx, int index) throws IOException {
  196           return getBin(tx, index).size();
  197       }
  198   
  199       public synchronized Value get(Transaction tx, Key key) throws IOException {
  200           assertLoaded();
  201           return getBin(tx, key).get(key);
  202       }
  203       
  204       public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
  205           assertLoaded();
  206           return getBin(tx, key).containsKey(key);
  207       }
  208   
  209       synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
  210           assertLoaded();
  211           HashBin<Key,Value> bin = getBin(tx, key);
  212   
  213           int originalSize = bin.size();
  214           Value result = bin.put(key,value);
  215           store(tx, bin);
  216   
  217           int newSize = bin.size();
  218   
  219           if (newSize != originalSize) {
  220               metadata.size++;
  221               if (newSize == 1) {
  222                   metadata.binsActive++;
  223               }
  224           }
  225   
  226           if (metadata.binsActive >= this.increaseThreshold) {
  227               newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
  228               if(metadata.binCapacity!=newSize) {
  229                   resize(tx, newSize);
  230               }
  231           }
  232           return result;
  233       }
  234       
  235       synchronized public Value remove(Transaction tx, Key key) throws IOException {
  236           assertLoaded();
  237   
  238           HashBin<Key,Value> bin = getBin(tx, key);
  239           int originalSize = bin.size();
  240           Value result = bin.remove(key);
  241           int newSize = bin.size();
  242           
  243           if (newSize != originalSize) {
  244               store(tx, bin);
  245   
  246               metadata.size--;
  247               if (newSize == 0) {
  248                   metadata.binsActive--;
  249               }
  250           }
  251   
  252           if (metadata.binsActive <= this.decreaseThreshold) {
  253               newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
  254               if(metadata.binCapacity!=newSize) {
  255                   resize(tx, newSize);
  256               }
  257           }
  258           return result;
  259       }
  260       
  261   
  262       public synchronized void clear(Transaction tx) throws IOException {
  263           assertLoaded();
  264           for (int i = 0; i < metadata.binCapacity; i++) {
  265               long pageId = metadata.binPageId + i;
  266               clearBinAtPage(tx, pageId);
  267           }
  268           metadata.size = 0;
  269           metadata.binsActive = 0;
  270       }
  271       
  272       public Iterator<Entry<Key, Value>> iterator(Transaction tx) throws IOException, UnsupportedOperationException {
  273           throw new UnsupportedOperationException();
  274       }
  275   
  276   
  277       /**
  278        * @param tx
  279        * @param pageId
  280        * @throws IOException
  281        */
  282       private void clearBinAtPage(Transaction tx, long pageId) throws IOException {
  283           Page<HashBin<Key,Value>> page = tx.load(pageId, null);
  284           HashBin<Key, Value> bin = new HashBin<Key,Value>();
  285           bin.setPage(page);
  286           page.set(bin);
  287           store(tx, bin);
  288       }
  289   
  290       public String toString() {
  291           String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
  292           return str;
  293       }
  294   
  295       // /////////////////////////////////////////////////////////////////
  296       // Implementation Methods
  297       // /////////////////////////////////////////////////////////////////
  298   
  299       private void assertLoaded() throws IllegalStateException {
  300           if( !loaded.get() ) {
  301               throw new IllegalStateException("The HashIndex is not loaded");
  302           }
  303       }
  304   
  305       public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
  306           tx.store(bin.getPage(), hashBinMarshaller, true);
  307       }
  308   
  309       // While resizing, the following contains the new resize data.
  310       
  311       private void resize(Transaction tx, final int newSize) throws IOException {
  312           LOG.debug("Resizing to: "+newSize);
  313           
  314           int resizeCapacity = newSize;
  315           long resizePageId = tx.allocate(resizeCapacity).getPageId();
  316   
  317           // In Phase 1 we copy the data to the new bins..
  318           // Initialize the bins..
  319           for (int i = 0; i < resizeCapacity; i++) {
  320               long pageId = resizePageId + i;
  321               clearBinAtPage(tx, pageId);
  322           }
  323   
  324           metadata.binsActive = 0;
  325           // Copy the data from the old bins to the new bins.
  326           for (int i = 0; i < metadata.binCapacity; i++) {
  327               
  328               HashBin<Key,Value> bin = getBin(tx, i);
  329               for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
  330                   HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity);
  331                   resizeBin.put(entry.getKey(), entry.getValue());
  332                   store(tx, resizeBin);
  333                   if( resizeBin.size() == 1) {
  334                       metadata.binsActive++;
  335                   }
  336               }
  337           }
  338           
  339           // In phase 2 we free the old bins and switch the the new bins.
  340           tx.free(metadata.binPageId, metadata.binCapacity);
  341           
  342           metadata.binCapacity = resizeCapacity;
  343           metadata.binPageId = resizePageId;
  344           metadata.state = OPEN_STATE;
  345           tx.store(metadata.page, metadataMarshaller, true);
  346           calcThresholds();
  347   
  348           LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);        
  349           resizeCapacity=0;
  350           resizePageId=0;
  351       }
  352   
  353       private void calcThresholds() {
  354           increaseThreshold = (metadata.binCapacity * loadFactor)/100;
  355           decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
  356       }
  357       
  358       private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
  359           return getBin(tx, key, metadata.binPageId, metadata.binCapacity);
  360       }
  361   
  362       private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
  363           return getBin(tx, i, metadata.binPageId);
  364       }
  365       
  366       private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException {
  367           int i = indexFor(key, capacity);
  368           return getBin(tx, i, basePage);
  369       }
  370   
  371       private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException {
  372           Page<HashBin<Key, Value>> page = tx.load(basePage + i, hashBinMarshaller);
  373           HashBin<Key, Value> rc = page.get();
  374           rc.setPage(page);
  375           return rc;
  376       }
  377   
  378       int indexFor(Key x, int length) {
  379           return Math.abs(x.hashCode()%length);
  380       }
  381   
  382       // /////////////////////////////////////////////////////////////////
  383       // Property Accessors
  384       // /////////////////////////////////////////////////////////////////
  385   
  386       public Marshaller<Key> getKeyMarshaller() {
  387           return keyMarshaller;
  388       }
  389   
  390       /**
  391        * Set the marshaller for key objects
  392        * 
  393        * @param marshaller
  394        */
  395       public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) {
  396           this.keyMarshaller = marshaller;
  397       }
  398   
  399       public Marshaller<Value> getValueMarshaller() {
  400           return valueMarshaller;
  401       }
  402       /**
  403        * Set the marshaller for value objects
  404        * 
  405        * @param marshaller
  406        */
  407       public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
  408           this.valueMarshaller = valueMarshaller;
  409       }
  410       
  411       /**
  412        * @return number of bins in the index
  413        */
  414       public int getBinCapacity() {
  415           return metadata.binCapacity;
  416       }
  417   
  418       /**
  419        * @param binCapacity
  420        */
  421       public void setBinCapacity(int binCapacity) {
  422           if (loaded.get() && binCapacity != metadata.binCapacity) {
  423               throw new RuntimeException("Pages already loaded - can't reset bin capacity");
  424           }
  425           metadata.binCapacity = binCapacity;
  426       }
  427   
  428       public boolean isTransient() {
  429           return false;
  430       }
  431   
  432       /**
  433        * @return the loadFactor
  434        */
  435       public int getLoadFactor() {
  436           return loadFactor;
  437       }
  438   
  439       /**
  440        * @param loadFactor the loadFactor to set
  441        */
  442       public void setLoadFactor(int loadFactor) {
  443           this.loadFactor = loadFactor;
  444       }
  445   
  446       /**
  447        * @return the maximumCapacity
  448        */
  449       public int setMaximumBinCapacity() {
  450           return maximumBinCapacity;
  451       }
  452   
  453       /**
  454        * @param maximumCapacity the maximumCapacity to set
  455        */
  456       public void setMaximumBinCapacity(int maximumCapacity) {
  457           this.maximumBinCapacity = maximumCapacity;
  458       }
  459   
  460       public synchronized int size(Transaction tx) {
  461           return metadata.size;
  462       }
  463   
  464       public synchronized int getActiveBins() {
  465           return metadata.binsActive;
  466       }
  467   
  468       public long getBinPageId() {
  469           return metadata.binPageId;
  470       }
  471   
  472       public PageFile getPageFile() {
  473           return pageFile;
  474       }
  475   
  476       public int getBinsActive() {
  477           return metadata.binsActive;
  478       }
  479   
  480   }

Home » activemq-parent-5.3.1-source-release » org.apache.kahadb.index » [javadoc | source]