1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.kaha.impl.data; 18 19 import java.io.IOException; 20 import java.io.RandomAccessFile; 21 22 import org.apache.activemq.kaha.Marshaller; 23 import org.apache.activemq.util.DataByteArrayOutputStream; 24 25 /** 26 * Optimized Store writer. Synchronously marshalls and writes to the data file. 27 * Simple but may introduce a bit of contention when put under load. 28 * 29 * @version $Revision: 1.1.1.1 $ 30 */ 31 public final class SyncDataFileWriter { 32 33 private DataByteArrayOutputStream buffer; 34 private DataManagerImpl dataManager; 35 36 /** 37 * Construct a Store writer 38 * 39 * @param fileId 40 */ 41 SyncDataFileWriter(DataManagerImpl fileManager) { 42 this.dataManager = fileManager; 43 this.buffer = new DataByteArrayOutputStream(); 44 } 45 46 /* 47 * (non-Javadoc) 48 * 49 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller, 50 * java.lang.Object, byte) 51 */ 52 public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) 53 throws IOException { 54 55 // Write the packet our internal buffer. 56 buffer.reset(); 57 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); 58 marshaller.writePayload(payload, buffer); 59 int size = buffer.size(); 60 int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE; 61 buffer.reset(); 62 buffer.writeByte(type); 63 buffer.writeInt(payloadSize); 64 65 // Find the position where this item will land at. 66 DataItem item = new DataItem(); 67 item.setSize(payloadSize); 68 DataFile dataFile = dataManager.findSpaceForData(item); 69 70 // Now splat the buffer to the file. 71 dataFile.getRandomAccessFile().seek(item.getOffset()); 72 dataFile.getRandomAccessFile().write(buffer.getData(), 0, size); 73 dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker.. 74 75 dataManager.addInterestInFile(dataFile); 76 return item; 77 } 78 79 /* 80 * (non-Javadoc) 81 * 82 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation, 83 * org.apache.activemq.kaha.Marshaller, java.lang.Object, byte) 84 */ 85 public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type) 86 throws IOException { 87 // Write the packet our internal buffer. 88 buffer.reset(); 89 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); 90 marshaller.writePayload(payload, buffer); 91 int size = buffer.size(); 92 int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE; 93 buffer.reset(); 94 buffer.writeByte(type); 95 buffer.writeInt(payloadSize); 96 item.setSize(payloadSize); 97 DataFile dataFile = dataManager.getDataFile(item); 98 RandomAccessFile file = dataFile.getRandomAccessFile(); 99 file.seek(item.getOffset()); 100 file.write(buffer.getData(), 0, size); 101 dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker.. 102 } 103 104 public synchronized void force(DataFile dataFile) throws IOException { 105 // If our dirty marker was set.. then we need to sync 106 if (dataFile.getWriterData() != null && dataFile.isDirty()) { 107 dataFile.getRandomAccessFile().getFD().sync(); 108 dataFile.setWriterData(null); 109 dataFile.setDirty(false); 110 } 111 } 112 113 public void close() throws IOException { 114 } 115 }