Method from org.apache.activemq.store.jdbc.JDBCPersistenceAdapter Detail: |
public void beginTransaction(ConnectionContext context) throws IOException {
TransactionContext transactionContext = getTransactionContext(context);
transactionContext.begin();
}
|
public void checkpoint(boolean sync) throws IOException {
}
|
public void cleanup() {
TransactionContext c = null;
try {
LOG.debug("Cleaning up old messages.");
c = getTransactionContext();
getAdapter().doDeleteOldMessages(c);
} catch (IOException e) {
LOG.warn("Old message cleanup failed due to: " + e, e);
} catch (SQLException e) {
LOG.warn("Old message cleanup failed due to: " + e);
JDBCPersistenceAdapter.log("Failure Details: ", e);
} finally {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
}
}
LOG.debug("Cleanup done.");
}
}
|
public void commitTransaction(ConnectionContext context) throws IOException {
TransactionContext transactionContext = getTransactionContext(context);
transactionContext.commit();
}
|
protected JDBCAdapter createAdapter() throws IOException {
adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
// Use the default JDBC adapter if the
// Database type is not recognized.
if (adapter == null) {
adapter = new DefaultJDBCAdapter();
LOG.debug("Using default JDBC Adapter: " + adapter);
}
return adapter;
}
|
protected void createMessageAudit() {
if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
TransactionContext c = null;
try {
c = getTransactionContext();
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
public void messageId(MessageId id) {
audit.isDuplicate(id);
}
});
} catch (Exception e) {
LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
} finally {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
}
}
}
}
}
|
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
return rc;
}
|
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
return rc;
}
|
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore(this);
}
return this.transactionStore;
}
|
protected void databaseLockKeepAlive() {
boolean stop = false;
try {
DatabaseLocker locker = getDatabaseLocker();
if (locker != null) {
if (!locker.keepAlive()) {
stop = true;
}
}
} catch (IOException e) {
LOG.error("Failed to get database when trying keepalive: " + e, e);
}
if (stop) {
stopBroker();
}
}
|
public void deleteAllMessages() throws IOException {
TransactionContext c = getTransactionContext();
try {
getAdapter().doDropTables(c);
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
getAdapter().doCreateTables(c);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create(e);
} finally {
c.close();
}
}
|
public JDBCAdapter getAdapter() throws IOException {
if (adapter == null) {
setAdapter(createAdapter());
}
return adapter;
}
|
public int getAuditRecoveryDepth() {
return auditRecoveryDepth;
}
|
public BrokerService getBrokerService() {
return brokerService;
}
|
public int getCleanupPeriod() {
return cleanupPeriod;
}
|
public DatabaseLocker getDatabaseLocker() throws IOException {
if (databaseLocker == null && isUseDatabaseLock()) {
setDatabaseLocker(loadDataBaseLocker());
}
return databaseLocker;
}
|
public Set<ActiveMQDestination> getDestinations() {
// Get a connection and insert the message into the DB.
TransactionContext c = null;
try {
c = getTransactionContext();
return getAdapter().doGetDestinations(c);
} catch (IOException e) {
return emptyDestinationSet();
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
return emptyDestinationSet();
} finally {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
}
}
}
}
|
public long getLastMessageBrokerSequenceId() throws IOException {
// Get a connection and insert the message into the DB.
TransactionContext c = getTransactionContext();
try {
long seq = getAdapter().doGetLastMessageStoreSequenceId(c);
sequenceGenerator.setLastSequenceId(seq);
long brokerSeq = 0;
if (seq != 0) {
Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq)));
brokerSeq = last.getMessageId().getBrokerSequenceId();
}
return brokerSeq;
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
} finally {
c.close();
}
}
|
public long getLockAcquireSleepInterval() {
return lockAcquireSleepInterval;
}
|
public DataSource getLockDataSource() throws IOException {
if (lockDataSource == null) {
lockDataSource = getDataSource();
if (lockDataSource == null) {
throw new IllegalArgumentException(
"No dataSource property has been configured");
}
} else {
LOG.info("Using a separate dataSource for locking: "
+ lockDataSource);
}
return lockDataSource;
}
|
public long getLockKeepAlivePeriod() {
return lockKeepAlivePeriod;
}
|
public int getMaxAuditDepth() {
return maxAuditDepth;
}
|
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
|
public long getNextSequenceId() {
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
}
|
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
if (clockDaemon == null) {
clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
thread.setDaemon(true);
return thread;
}
});
}
return clockDaemon;
}
|
public Statements getStatements() {
if (statements == null) {
statements = new Statements();
}
return statements;
}
|
public TransactionContext getTransactionContext() throws IOException {
TransactionContext answer = new TransactionContext(getDataSource());
if (transactionIsolation > 0) {
answer.setTransactionIsolation(transactionIsolation);
}
return answer;
}
|
public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
if (context == null) {
return getTransactionContext();
} else {
TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
if (answer == null) {
answer = getTransactionContext();
context.setLongTermStoreContext(answer);
}
return answer;
}
}
|
public WireFormat getWireFormat() {
return wireFormat;
}
|
public void initSequenceIdGenerator() {
TransactionContext c = null;
try {
c = getTransactionContext();
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
public void messageId(MessageId id) {
audit.isDuplicate(id);
}
});
} catch (Exception e) {
LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
} finally {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
}
}
}
}
|
public boolean isCreateTablesOnStartup() {
return createTablesOnStartup;
}
|
public boolean isEnableAudit() {
return enableAudit;
}
|
public boolean isUseDatabaseLock() {
return useDatabaseLock;
}
|
public boolean isUseExternalMessageReferences() {
return useExternalMessageReferences;
}
|
protected DatabaseLocker loadDataBaseLocker() throws IOException {
DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
if (locker == null) {
locker = new DefaultDatabaseLocker();
LOG.debug("Using default JDBC Locker: " + locker);
}
return locker;
}
|
public static void log(String msg,
SQLException e) {
String s = msg + e.getMessage();
while (e.getNextException() != null) {
e = e.getNextException();
s += ", due to: " + e.getMessage();
}
LOG.warn(s, e);
}
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
Cleanup method to remove any state associated with the given destination
No state retained.... nothing to do |
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
Cleanup method to remove any state associated with the given destination
No state retained.... nothing to do |
public void rollbackTransaction(ConnectionContext context) throws IOException {
TransactionContext transactionContext = getTransactionContext(context);
transactionContext.rollback();
}
|
public void setAdapter(JDBCAdapter adapter) {
this.adapter = adapter;
this.adapter.setStatements(getStatements());
}
|
public void setAuditRecoveryDepth(int auditRecoveryDepth) {
this.auditRecoveryDepth = auditRecoveryDepth;
}
|
public void setBrokerName(String brokerName) {
}
|
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
|
public void setCleanupPeriod(int cleanupPeriod) {
this.cleanupPeriod = cleanupPeriod;
}
Sets the number of milliseconds until the database is attempted to be
cleaned up for durable topics |
public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
this.createTablesOnStartup = createTablesOnStartup;
}
Sets whether or not tables are created on startup |
public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
databaseLocker = locker;
databaseLocker.setPersistenceAdapter(this);
databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
}
Sets the database locker strategy to use to lock the database on startup |
public void setDirectory(File dir) {
}
|
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
|
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
this.lockAcquireSleepInterval = lockAcquireSleepInterval;
}
millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
not applied if DataBaseLocker is injected. |
public void setLockDataSource(DataSource dataSource) {
this.lockDataSource = dataSource;
}
|
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
this.lockKeepAlivePeriod = lockKeepAlivePeriod;
}
|
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
|
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
|
public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
this.clockDaemon = clockDaemon;
}
|
public void setStatements(Statements statements) {
this.statements = statements;
}
|
public void setTransactionIsolation(int transactionIsolation) {
this.transactionIsolation = transactionIsolation;
}
set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
This allowable dirty isolation level may not be achievable in clustered DB environments
so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ
see isolation level constants in java.sql.Connection |
public void setUsageManager(SystemUsage usageManager) {
}
|
public void setUseDatabaseLock(boolean useDatabaseLock) {
this.useDatabaseLock = useDatabaseLock;
}
Sets whether or not an exclusive database lock should be used to enable
JDBC Master/Slave. Enabled by default. |
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
this.useExternalMessageReferences = useExternalMessageReferences;
}
|
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
|
public long size() {
return 0;
}
|
public void start() throws Exception {
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
if (isCreateTablesOnStartup()) {
TransactionContext transactionContext = getTransactionContext();
transactionContext.begin();
try {
try {
getAdapter().doCreateTables(transactionContext);
} catch (SQLException e) {
LOG.warn("Cannot create tables due to: " + e);
JDBCPersistenceAdapter.log("Failure Details: ", e);
}
} finally {
transactionContext.commit();
}
}
if (isUseDatabaseLock()) {
DatabaseLocker service = getDatabaseLocker();
if (service == null) {
LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
} else {
service.start();
if (lockKeepAlivePeriod > 0) {
keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
public void run() {
databaseLockKeepAlive();
}
}, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
}
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
}
}
cleanup();
// Cleanup the db periodically.
if (cleanupPeriod > 0) {
cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
public void run() {
cleanup();
}
}, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
}
createMessageAudit();
}
|
public synchronized void stop() throws Exception {
if (cleanupTicket != null) {
cleanupTicket.cancel(true);
cleanupTicket = null;
}
if (keepAliveTicket != null) {
keepAliveTicket.cancel(false);
keepAliveTicket = null;
}
// do not shutdown clockDaemon as it may kill the thread initiating shutdown
DatabaseLocker service = getDatabaseLocker();
if (service != null) {
service.stop();
}
}
|
protected void stopBroker() {
// we can no longer keep the lock so lets fail
LOG.info("No longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
} catch (Exception e) {
LOG.warn("Failure occured while stopping broker");
}
}
|
public String toString() {
return "JDBCPersistenceAdapter(" + super.toString() + ")";
}
|