1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.store.journal; 18 19 import java.io.File; 20 import java.io.IOException; 21 22 import org.apache.activeio.journal.Journal; 23 import org.apache.activeio.journal.active.JournalImpl; 24 import org.apache.activeio.journal.active.JournalLockedException; 25 import org.apache.activemq.store.PersistenceAdapter; 26 import org.apache.activemq.store.PersistenceAdapterFactory; 27 import org.apache.activemq.store.jdbc.DataSourceSupport; 28 import org.apache.activemq.store.jdbc.JDBCAdapter; 29 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 30 import org.apache.activemq.store.jdbc.Statements; 31 import org.apache.activemq.thread.TaskRunnerFactory; 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 35 /** 36 * Factory class that can create PersistenceAdapter objects. 37 * 38 * @org.apache.xbean.XBean 39 * @version $Revision: 1.4 $ 40 */ 41 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory { 42 43 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 44 45 private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapterFactory.class); 46 47 private int journalLogFileSize = 1024 * 1024 * 20; 48 private int journalLogFiles = 2; 49 private TaskRunnerFactory taskRunnerFactory; 50 private Journal journal; 51 private boolean useJournal = true; 52 private boolean useQuickJournal; 53 private File journalArchiveDirectory; 54 private boolean failIfJournalIsLocked; 55 private int journalThreadPriority = Thread.MAX_PRIORITY; 56 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); 57 private boolean useDedicatedTaskRunner; 58 59 public PersistenceAdapter createPersistenceAdapter() throws IOException { 60 jdbcPersistenceAdapter.setDataSource(getDataSource()); 61 62 if (!useJournal) { 63 return jdbcPersistenceAdapter; 64 } 65 return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); 66 67 } 68 69 public int getJournalLogFiles() { 70 return journalLogFiles; 71 } 72 73 /** 74 * Sets the number of journal log files to use 75 */ 76 public void setJournalLogFiles(int journalLogFiles) { 77 this.journalLogFiles = journalLogFiles; 78 } 79 80 public int getJournalLogFileSize() { 81 return journalLogFileSize; 82 } 83 84 /** 85 * Sets the size of the journal log files 86 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 87 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 88 */ 89 public void setJournalLogFileSize(int journalLogFileSize) { 90 this.journalLogFileSize = journalLogFileSize; 91 } 92 93 public JDBCPersistenceAdapter getJdbcAdapter() { 94 return jdbcPersistenceAdapter; 95 } 96 97 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) { 98 this.jdbcPersistenceAdapter = jdbcAdapter; 99 } 100 101 public boolean isUseJournal() { 102 return useJournal; 103 } 104 105 /** 106 * Enables or disables the use of the journal. The default is to use the 107 * journal 108 * 109 * @param useJournal 110 */ 111 public void setUseJournal(boolean useJournal) { 112 this.useJournal = useJournal; 113 } 114 115 public boolean isUseDedicatedTaskRunner() { 116 return useDedicatedTaskRunner; 117 } 118 119 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 120 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 121 } 122 123 public TaskRunnerFactory getTaskRunnerFactory() { 124 if (taskRunnerFactory == null) { 125 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, 126 true, 1000, isUseDedicatedTaskRunner()); 127 } 128 return taskRunnerFactory; 129 } 130 131 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 132 this.taskRunnerFactory = taskRunnerFactory; 133 } 134 135 public Journal getJournal() throws IOException { 136 if (journal == null) { 137 createJournal(); 138 } 139 return journal; 140 } 141 142 public void setJournal(Journal journal) { 143 this.journal = journal; 144 } 145 146 public File getJournalArchiveDirectory() { 147 if (journalArchiveDirectory == null && useQuickJournal) { 148 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal"); 149 } 150 return journalArchiveDirectory; 151 } 152 153 public void setJournalArchiveDirectory(File journalArchiveDirectory) { 154 this.journalArchiveDirectory = journalArchiveDirectory; 155 } 156 157 public boolean isUseQuickJournal() { 158 return useQuickJournal; 159 } 160 161 /** 162 * Enables or disables the use of quick journal, which keeps messages in the 163 * journal and just stores a reference to the messages in JDBC. Defaults to 164 * false so that messages actually reside long term in the JDBC database. 165 */ 166 public void setUseQuickJournal(boolean useQuickJournal) { 167 this.useQuickJournal = useQuickJournal; 168 } 169 170 public JDBCAdapter getAdapter() throws IOException { 171 return jdbcPersistenceAdapter.getAdapter(); 172 } 173 174 public void setAdapter(JDBCAdapter adapter) { 175 jdbcPersistenceAdapter.setAdapter(adapter); 176 } 177 178 public Statements getStatements() { 179 return jdbcPersistenceAdapter.getStatements(); 180 } 181 182 public void setStatements(Statements statements) { 183 jdbcPersistenceAdapter.setStatements(statements); 184 } 185 186 public boolean isUseDatabaseLock() { 187 return jdbcPersistenceAdapter.isUseDatabaseLock(); 188 } 189 190 /** 191 * Sets whether or not an exclusive database lock should be used to enable 192 * JDBC Master/Slave. Enabled by default. 193 */ 194 public void setUseDatabaseLock(boolean useDatabaseLock) { 195 jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock); 196 } 197 198 public boolean isCreateTablesOnStartup() { 199 return jdbcPersistenceAdapter.isCreateTablesOnStartup(); 200 } 201 202 /** 203 * Sets whether or not tables are created on startup 204 */ 205 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 206 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); 207 } 208 209 public int getJournalThreadPriority() { 210 return journalThreadPriority; 211 } 212 213 /** 214 * Sets the thread priority of the journal thread 215 */ 216 public void setJournalThreadPriority(int journalThreadPriority) { 217 this.journalThreadPriority = journalThreadPriority; 218 } 219 220 /** 221 * @throws IOException 222 */ 223 protected void createJournal() throws IOException { 224 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile(); 225 if (failIfJournalIsLocked) { 226 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 227 getJournalArchiveDirectory()); 228 } else { 229 while (true) { 230 try { 231 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 232 getJournalArchiveDirectory()); 233 break; 234 } catch (JournalLockedException e) { 235 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) 236 + " seconds for the journal to be unlocked."); 237 try { 238 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 239 } catch (InterruptedException e1) { 240 } 241 } 242 } 243 } 244 } 245 246 }