1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.kaha.impl.data; 18 19 import java.io.File; 20 import java.io.FilenameFilter; 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.HashMap; 24 import java.util.Iterator; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.concurrent.atomic.AtomicLong; 28 29 import org.apache.activemq.kaha.Marshaller; 30 import org.apache.activemq.kaha.StoreLocation; 31 import org.apache.activemq.kaha.impl.DataManager; 32 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; 33 import org.apache.activemq.util.IOExceptionSupport; 34 import org.apache.activemq.util.IOHelper; 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 38 /** 39 * Manages DataFiles 40 * 41 * @version $Revision: 1.1.1.1 $ 42 */ 43 public final class DataManagerImpl implements DataManager { 44 45 public static final int ITEM_HEAD_SIZE = 5; // type + length 46 public static final byte DATA_ITEM_TYPE = 1; 47 public static final byte REDO_ITEM_TYPE = 2; 48 public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32; 49 50 private static final Log LOG = LogFactory.getLog(DataManagerImpl.class); 51 private static final String NAME_PREFIX = "data-"; 52 53 private final File directory; 54 private final String name; 55 private SyncDataFileReader reader; 56 private SyncDataFileWriter writer; 57 private DataFile currentWriteFile; 58 private long maxFileLength = MAX_FILE_LENGTH; 59 private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 60 private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER; 61 private String dataFilePrefix; 62 private final AtomicLong storeSize; 63 64 public DataManagerImpl(File dir, final String name,AtomicLong storeSize) { 65 this.directory = dir; 66 this.name = name; 67 this.storeSize=storeSize; 68 69 dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-"); 70 // build up list of current dataFiles 71 File[] files = dir.listFiles(new FilenameFilter() { 72 public boolean accept(File dir, String n) { 73 return dir.equals(directory) && n.startsWith(dataFilePrefix); 74 } 75 }); 76 if (files != null) { 77 for (int i = 0; i < files.length; i++) { 78 File file = files[i]; 79 String n = file.getName(); 80 String numStr = n.substring(dataFilePrefix.length(), n.length()); 81 int num = Integer.parseInt(numStr); 82 DataFile dataFile = new DataFile(file, num); 83 storeSize.addAndGet(dataFile.getLength()); 84 fileMap.put(dataFile.getNumber(), dataFile); 85 if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) { 86 currentWriteFile = dataFile; 87 } 88 } 89 } 90 } 91 92 private DataFile createAndAddDataFile(int num) { 93 String fileName = dataFilePrefix + num; 94 File file = new File(directory, fileName); 95 DataFile result = new DataFile(file, num); 96 fileMap.put(result.getNumber(), result); 97 return result; 98 } 99 100 /* 101 * (non-Javadoc) 102 * 103 * @see org.apache.activemq.kaha.impl.data.IDataManager#getName() 104 */ 105 public String getName() { 106 return name; 107 } 108 109 synchronized DataFile findSpaceForData(DataItem item) throws IOException { 110 if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) { 111 int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1; 112 if (currentWriteFile != null && currentWriteFile.isUnused()) { 113 removeDataFile(currentWriteFile); 114 } 115 currentWriteFile = createAndAddDataFile(nextNum); 116 } 117 item.setOffset(currentWriteFile.getLength()); 118 item.setFile(currentWriteFile.getNumber().intValue()); 119 int len = item.getSize() + ITEM_HEAD_SIZE; 120 currentWriteFile.incrementLength(len); 121 storeSize.addAndGet(len); 122 return currentWriteFile; 123 } 124 125 DataFile getDataFile(StoreLocation item) throws IOException { 126 Integer key = Integer.valueOf(item.getFile()); 127 DataFile dataFile = fileMap.get(key); 128 if (dataFile == null) { 129 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 130 throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile()); 131 } 132 return dataFile; 133 } 134 135 /* 136 * (non-Javadoc) 137 * 138 * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, 139 * org.apache.activemq.kaha.StoreLocation) 140 */ 141 public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException { 142 return getReader().readItem(marshaller, item); 143 } 144 145 /* 146 * (non-Javadoc) 147 * 148 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, 149 * java.lang.Object) 150 */ 151 public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException { 152 return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE); 153 } 154 155 /* 156 * (non-Javadoc) 157 * 158 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object) 159 */ 160 public synchronized StoreLocation storeRedoItem(Object payload) throws IOException { 161 return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE); 162 } 163 164 /* 165 * (non-Javadoc) 166 * 167 * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, 168 * org.apache.activemq.kaha.Marshaller, java.lang.Object) 169 */ 170 public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload) 171 throws IOException { 172 getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE); 173 } 174 175 /* 176 * (non-Javadoc) 177 * 178 * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener) 179 */ 180 public synchronized void recoverRedoItems(RedoListener listener) throws IOException { 181 182 // Nothing to recover if there is no current file. 183 if (currentWriteFile == null) { 184 return; 185 } 186 187 DataItem item = new DataItem(); 188 item.setFile(currentWriteFile.getNumber().intValue()); 189 item.setOffset(0); 190 while (true) { 191 byte type; 192 try { 193 type = getReader().readDataItemSize(item); 194 } catch (IOException ignore) { 195 LOG.trace("End of data file reached at (header was invalid): " + item); 196 return; 197 } 198 if (type == REDO_ITEM_TYPE) { 199 // Un-marshal the redo item 200 Object object; 201 try { 202 object = readItem(redoMarshaller, item); 203 } catch (IOException e1) { 204 LOG.trace("End of data file reached at (payload was invalid): " + item); 205 return; 206 } 207 try { 208 209 listener.onRedoItem(item, object); 210 // in case the listener is holding on to item references, 211 // copy it 212 // so we don't change it behind the listener's back. 213 item = item.copy(); 214 215 } catch (Exception e) { 216 throw IOExceptionSupport.create("Recovery handler failed: " + e, e); 217 } 218 } 219 // Move to the next item. 220 item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize()); 221 } 222 } 223 224 /* 225 * (non-Javadoc) 226 * 227 * @see org.apache.activemq.kaha.impl.data.IDataManager#close() 228 */ 229 public synchronized void close() throws IOException { 230 getWriter().close(); 231 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 232 DataFile dataFile = i.next(); 233 getWriter().force(dataFile); 234 dataFile.close(); 235 } 236 fileMap.clear(); 237 } 238 239 /* 240 * (non-Javadoc) 241 * 242 * @see org.apache.activemq.kaha.impl.data.IDataManager#force() 243 */ 244 public synchronized void force() throws IOException { 245 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 246 DataFile dataFile = i.next(); 247 getWriter().force(dataFile); 248 } 249 } 250 251 /* 252 * (non-Javadoc) 253 * 254 * @see org.apache.activemq.kaha.impl.data.IDataManager#delete() 255 */ 256 public synchronized boolean delete() throws IOException { 257 boolean result = true; 258 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 259 DataFile dataFile = i.next(); 260 storeSize.addAndGet(-dataFile.getLength()); 261 result &= dataFile.delete(); 262 } 263 fileMap.clear(); 264 return result; 265 } 266 267 /* 268 * (non-Javadoc) 269 * 270 * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int) 271 */ 272 public synchronized void addInterestInFile(int file) throws IOException { 273 if (file >= 0) { 274 Integer key = Integer.valueOf(file); 275 DataFile dataFile = fileMap.get(key); 276 if (dataFile == null) { 277 dataFile = createAndAddDataFile(file); 278 } 279 addInterestInFile(dataFile); 280 } 281 } 282 283 synchronized void addInterestInFile(DataFile dataFile) { 284 if (dataFile != null) { 285 dataFile.increment(); 286 } 287 } 288 289 /* 290 * (non-Javadoc) 291 * 292 * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int) 293 */ 294 public synchronized void removeInterestInFile(int file) throws IOException { 295 if (file >= 0) { 296 Integer key = Integer.valueOf(file); 297 DataFile dataFile = fileMap.get(key); 298 removeInterestInFile(dataFile); 299 } 300 } 301 302 synchronized void removeInterestInFile(DataFile dataFile) throws IOException { 303 if (dataFile != null) { 304 305 if (dataFile.decrement() <= 0) { 306 if (dataFile != currentWriteFile) { 307 removeDataFile(dataFile); 308 } 309 } 310 } 311 } 312 313 /* 314 * (non-Javadoc) 315 * 316 * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles() 317 */ 318 public synchronized void consolidateDataFiles() throws IOException { 319 List<DataFile> purgeList = new ArrayList<DataFile>(); 320 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 321 DataFile dataFile = i.next(); 322 if (dataFile.isUnused() && dataFile != currentWriteFile) { 323 purgeList.add(dataFile); 324 } 325 } 326 for (int i = 0; i < purgeList.size(); i++) { 327 DataFile dataFile = purgeList.get(i); 328 removeDataFile(dataFile); 329 } 330 } 331 332 private void removeDataFile(DataFile dataFile) throws IOException { 333 fileMap.remove(dataFile.getNumber()); 334 if (writer != null) { 335 writer.force(dataFile); 336 } 337 storeSize.addAndGet(-dataFile.getLength()); 338 boolean result = dataFile.delete(); 339 LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed")); 340 } 341 342 /* 343 * (non-Javadoc) 344 * 345 * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller() 346 */ 347 public Marshaller getRedoMarshaller() { 348 return redoMarshaller; 349 } 350 351 /* 352 * (non-Javadoc) 353 * 354 * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller) 355 */ 356 public void setRedoMarshaller(Marshaller redoMarshaller) { 357 this.redoMarshaller = redoMarshaller; 358 } 359 360 /** 361 * @return the maxFileLength 362 */ 363 public long getMaxFileLength() { 364 return maxFileLength; 365 } 366 367 /** 368 * @param maxFileLength the maxFileLength to set 369 */ 370 public void setMaxFileLength(long maxFileLength) { 371 this.maxFileLength = maxFileLength; 372 } 373 374 public String toString() { 375 return "DataManager:(" + NAME_PREFIX + name + ")"; 376 } 377 378 public synchronized SyncDataFileReader getReader() { 379 if (reader == null) { 380 reader = createReader(); 381 } 382 return reader; 383 } 384 385 protected synchronized SyncDataFileReader createReader() { 386 return new SyncDataFileReader(this); 387 } 388 389 public synchronized void setReader(SyncDataFileReader reader) { 390 this.reader = reader; 391 } 392 393 public synchronized SyncDataFileWriter getWriter() { 394 if (writer == null) { 395 writer = createWriter(); 396 } 397 return writer; 398 } 399 400 private SyncDataFileWriter createWriter() { 401 return new SyncDataFileWriter(this); 402 } 403 404 public synchronized void setWriter(SyncDataFileWriter writer) { 405 this.writer = writer; 406 } 407 408 }