Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » data » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl.data;
   18   
   19   import java.io.File;
   20   import java.io.FilenameFilter;
   21   import java.io.IOException;
   22   import java.util.ArrayList;
   23   import java.util.HashMap;
   24   import java.util.Iterator;
   25   import java.util.List;
   26   import java.util.Map;
   27   import java.util.concurrent.atomic.AtomicLong;
   28   
   29   import org.apache.activemq.kaha.Marshaller;
   30   import org.apache.activemq.kaha.StoreLocation;
   31   import org.apache.activemq.kaha.impl.DataManager;
   32   import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
   33   import org.apache.activemq.util.IOExceptionSupport;
   34   import org.apache.activemq.util.IOHelper;
   35   import org.apache.commons.logging.Log;
   36   import org.apache.commons.logging.LogFactory;
   37   
   38   /**
   39    * Manages DataFiles
   40    * 
   41    * @version $Revision: 1.1.1.1 $
   42    */
   43   public final class DataManagerImpl implements DataManager {
   44   
   45       public static final int ITEM_HEAD_SIZE = 5; // type + length
   46       public static final byte DATA_ITEM_TYPE = 1;
   47       public static final byte REDO_ITEM_TYPE = 2;
   48       public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
   49       
   50       private static final Log LOG = LogFactory.getLog(DataManagerImpl.class);
   51       private static final String NAME_PREFIX = "data-";
   52       
   53       private final File directory;
   54       private final String name;
   55       private SyncDataFileReader reader;
   56       private SyncDataFileWriter writer;
   57       private DataFile currentWriteFile;
   58       private long maxFileLength = MAX_FILE_LENGTH;
   59       private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
   60       private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
   61       private String dataFilePrefix;
   62       private final AtomicLong storeSize;
   63   
   64       public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
   65           this.directory = dir;
   66           this.name = name;
   67           this.storeSize=storeSize;
   68   
   69           dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
   70           // build up list of current dataFiles
   71           File[] files = dir.listFiles(new FilenameFilter() {
   72               public boolean accept(File dir, String n) {
   73                   return dir.equals(directory) && n.startsWith(dataFilePrefix);
   74               }
   75           });
   76           if (files != null) {
   77               for (int i = 0; i < files.length; i++) {
   78                   File file = files[i];
   79                   String n = file.getName();
   80                   String numStr = n.substring(dataFilePrefix.length(), n.length());
   81                   int num = Integer.parseInt(numStr);
   82                   DataFile dataFile = new DataFile(file, num);
   83                   storeSize.addAndGet(dataFile.getLength());
   84                   fileMap.put(dataFile.getNumber(), dataFile);
   85                   if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
   86                       currentWriteFile = dataFile;
   87                   }
   88               }
   89           }
   90       }
   91   
   92       private DataFile createAndAddDataFile(int num) {
   93           String fileName = dataFilePrefix + num;
   94           File file = new File(directory, fileName);
   95           DataFile result = new DataFile(file, num);
   96           fileMap.put(result.getNumber(), result);
   97           return result;
   98       }
   99   
  100       /*
  101        * (non-Javadoc)
  102        * 
  103        * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
  104        */
  105       public String getName() {
  106           return name;
  107       }
  108   
  109       synchronized DataFile findSpaceForData(DataItem item) throws IOException {
  110           if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
  111               int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
  112               if (currentWriteFile != null && currentWriteFile.isUnused()) {
  113                   removeDataFile(currentWriteFile);
  114               }
  115               currentWriteFile = createAndAddDataFile(nextNum);
  116           }
  117           item.setOffset(currentWriteFile.getLength());
  118           item.setFile(currentWriteFile.getNumber().intValue());
  119           int len = item.getSize() + ITEM_HEAD_SIZE;
  120           currentWriteFile.incrementLength(len);
  121           storeSize.addAndGet(len);
  122           return currentWriteFile;
  123       }
  124   
  125       DataFile getDataFile(StoreLocation item) throws IOException {
  126           Integer key = Integer.valueOf(item.getFile());
  127           DataFile dataFile = fileMap.get(key);
  128           if (dataFile == null) {
  129               LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
  130               throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
  131           }
  132           return dataFile;
  133       }
  134   
  135       /*
  136        * (non-Javadoc)
  137        * 
  138        * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
  139        *      org.apache.activemq.kaha.StoreLocation)
  140        */
  141       public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
  142           return getReader().readItem(marshaller, item);
  143       }
  144   
  145       /*
  146        * (non-Javadoc)
  147        * 
  148        * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
  149        *      java.lang.Object)
  150        */
  151       public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
  152           return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
  153       }
  154   
  155       /*
  156        * (non-Javadoc)
  157        * 
  158        * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
  159        */
  160       public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
  161           return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
  162       }
  163   
  164       /*
  165        * (non-Javadoc)
  166        * 
  167        * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
  168        *      org.apache.activemq.kaha.Marshaller, java.lang.Object)
  169        */
  170       public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
  171           throws IOException {
  172           getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
  173       }
  174   
  175       /*
  176        * (non-Javadoc)
  177        * 
  178        * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
  179        */
  180       public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
  181   
  182           // Nothing to recover if there is no current file.
  183           if (currentWriteFile == null) {
  184               return;
  185           }
  186   
  187           DataItem item = new DataItem();
  188           item.setFile(currentWriteFile.getNumber().intValue());
  189           item.setOffset(0);
  190           while (true) {
  191               byte type;
  192               try {
  193                   type = getReader().readDataItemSize(item);
  194               } catch (IOException ignore) {
  195                   LOG.trace("End of data file reached at (header was invalid): " + item);
  196                   return;
  197               }
  198               if (type == REDO_ITEM_TYPE) {
  199                   // Un-marshal the redo item
  200                   Object object;
  201                   try {
  202                       object = readItem(redoMarshaller, item);
  203                   } catch (IOException e1) {
  204                       LOG.trace("End of data file reached at (payload was invalid): " + item);
  205                       return;
  206                   }
  207                   try {
  208   
  209                       listener.onRedoItem(item, object);
  210                       // in case the listener is holding on to item references,
  211                       // copy it
  212                       // so we don't change it behind the listener's back.
  213                       item = item.copy();
  214   
  215                   } catch (Exception e) {
  216                       throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
  217                   }
  218               }
  219               // Move to the next item.
  220               item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
  221           }
  222       }
  223   
  224       /*
  225        * (non-Javadoc)
  226        * 
  227        * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
  228        */
  229       public synchronized void close() throws IOException {
  230           getWriter().close();
  231           for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
  232               DataFile dataFile = i.next();
  233               getWriter().force(dataFile);
  234               dataFile.close();
  235           }
  236           fileMap.clear();
  237       }
  238   
  239       /*
  240        * (non-Javadoc)
  241        * 
  242        * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
  243        */
  244       public synchronized void force() throws IOException {
  245           for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
  246               DataFile dataFile = i.next();
  247               getWriter().force(dataFile);
  248           }
  249       }
  250   
  251       /*
  252        * (non-Javadoc)
  253        * 
  254        * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
  255        */
  256       public synchronized boolean delete() throws IOException {
  257           boolean result = true;
  258           for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
  259               DataFile dataFile = i.next();
  260               storeSize.addAndGet(-dataFile.getLength());
  261               result &= dataFile.delete();
  262           }
  263           fileMap.clear();
  264           return result;
  265       }
  266   
  267       /*
  268        * (non-Javadoc)
  269        * 
  270        * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
  271        */
  272       public synchronized void addInterestInFile(int file) throws IOException {
  273           if (file >= 0) {
  274               Integer key = Integer.valueOf(file);
  275               DataFile dataFile = fileMap.get(key);
  276               if (dataFile == null) {
  277                   dataFile = createAndAddDataFile(file);
  278               }
  279               addInterestInFile(dataFile);
  280           }
  281       }
  282   
  283       synchronized void addInterestInFile(DataFile dataFile) {
  284           if (dataFile != null) {
  285               dataFile.increment();
  286           }
  287       }
  288   
  289       /*
  290        * (non-Javadoc)
  291        * 
  292        * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
  293        */
  294       public synchronized void removeInterestInFile(int file) throws IOException {
  295           if (file >= 0) {
  296               Integer key = Integer.valueOf(file);
  297               DataFile dataFile = fileMap.get(key);
  298               removeInterestInFile(dataFile);
  299           }
  300       }
  301   
  302       synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
  303           if (dataFile != null) {
  304              
  305               if (dataFile.decrement() <= 0) {
  306                   if (dataFile != currentWriteFile) {
  307                       removeDataFile(dataFile);
  308                   }
  309               }
  310           }
  311       }
  312   
  313       /*
  314        * (non-Javadoc)
  315        * 
  316        * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
  317        */
  318       public synchronized void consolidateDataFiles() throws IOException {
  319           List<DataFile> purgeList = new ArrayList<DataFile>();
  320           for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
  321               DataFile dataFile = i.next();
  322               if (dataFile.isUnused() && dataFile != currentWriteFile) {
  323                   purgeList.add(dataFile);
  324               }
  325           }
  326           for (int i = 0; i < purgeList.size(); i++) {
  327               DataFile dataFile = purgeList.get(i);
  328               removeDataFile(dataFile);
  329           }
  330       }
  331   
  332       private void removeDataFile(DataFile dataFile) throws IOException {
  333           fileMap.remove(dataFile.getNumber());
  334           if (writer != null) {
  335               writer.force(dataFile);
  336           }
  337           storeSize.addAndGet(-dataFile.getLength());
  338           boolean result = dataFile.delete();
  339           LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
  340       }
  341   
  342       /*
  343        * (non-Javadoc)
  344        * 
  345        * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
  346        */
  347       public Marshaller getRedoMarshaller() {
  348           return redoMarshaller;
  349       }
  350   
  351       /*
  352        * (non-Javadoc)
  353        * 
  354        * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
  355        */
  356       public void setRedoMarshaller(Marshaller redoMarshaller) {
  357           this.redoMarshaller = redoMarshaller;
  358       }
  359   
  360       /**
  361        * @return the maxFileLength
  362        */
  363       public long getMaxFileLength() {
  364           return maxFileLength;
  365       }
  366   
  367       /**
  368        * @param maxFileLength the maxFileLength to set
  369        */
  370       public void setMaxFileLength(long maxFileLength) {
  371           this.maxFileLength = maxFileLength;
  372       }
  373   
  374       public String toString() {
  375           return "DataManager:(" + NAME_PREFIX + name + ")";
  376       }
  377   
  378       public synchronized SyncDataFileReader getReader() {
  379           if (reader == null) {
  380               reader = createReader();
  381           }
  382           return reader;
  383       }
  384   
  385       protected synchronized SyncDataFileReader createReader() {
  386           return new SyncDataFileReader(this);
  387       }
  388   
  389       public synchronized void setReader(SyncDataFileReader reader) {
  390           this.reader = reader;
  391       }
  392   
  393       public synchronized SyncDataFileWriter getWriter() {
  394           if (writer == null) {
  395               writer = createWriter();
  396           }
  397           return writer;
  398       }
  399   
  400       private SyncDataFileWriter createWriter() {
  401           return new SyncDataFileWriter(this);
  402       }
  403   
  404       public synchronized void setWriter(SyncDataFileWriter writer) {
  405           this.writer = writer;
  406       }
  407   
  408   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » data » [javadoc | source]