Method from org.apache.camel.component.jms.JmsConfiguration Detail: |
public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
// TODO we could allow a spring container to auto-inject these objects?
switch (consumerType) {
case Simple:
return isUseVersion102()
? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
case ServerSessionPool:
return isUseVersion102()
? new ServerSessionMessageListenerContainer102()
: new ServerSessionMessageListenerContainer();
case Default:
return isUseVersion102()
? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
default:
throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
}
}
|
public void configure(EndpointMessageListener listener) {
if (isDisableReplyTo()) {
listener.setDisableReplyTo(true);
}
if (isEagerLoadingOfProperties()) {
listener.setEagerLoadingOfProperties(true);
}
// TODO: REVISIT: We really ought to change the model and let JmsProducer
// and JmsConsumer have their own JmsConfiguration instance
// This way producer's and consumer's QoS can differ and be
// independently configured
JmsOperations operations = listener.getTemplate();
if (operations instanceof JmsTemplate) {
JmsTemplate template = (JmsTemplate)operations;
template.setDeliveryPersistent(isReplyToDeliveryPersistent());
}
}
|
protected void configureMessageListenerContainer(AbstractMessageListenerContainer container,
JmsEndpoint endpoint) {
container.setConnectionFactory(getListenerConnectionFactory());
if (endpoint instanceof DestinationEndpoint) {
container.setDestinationResolver(createDestinationResolver((DestinationEndpoint) endpoint));
} else if (destinationResolver != null) {
container.setDestinationResolver(destinationResolver);
}
if (autoStartup) {
container.setAutoStartup(true);
}
if (clientId != null) {
container.setClientId(clientId);
}
container.setSubscriptionDurable(subscriptionDurable);
if (durableSubscriptionName != null) {
container.setDurableSubscriptionName(durableSubscriptionName);
}
// lets default to durable subscription if the subscriber name and
// client ID are specified (as there's
// no reason to specify them if not! :)
if (durableSubscriptionName != null && clientId != null) {
container.setSubscriptionDurable(true);
}
if (exceptionListener != null) {
container.setExceptionListener(exceptionListener);
}
container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
container.setExposeListenerSession(exposeListenerSession);
container.setSessionTransacted(transacted);
if (transacted) {
container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
} else {
if (acknowledgementMode >= 0) {
container.setSessionAcknowledgeMode(acknowledgementMode);
} else if (acknowledgementModeName != null) {
container.setSessionAcknowledgeModeName(acknowledgementModeName);
}
}
if (endpoint.getSelector() != null && endpoint.getSelector().length() != 0) {
container.setMessageSelector(endpoint.getSelector());
}
if (container instanceof DefaultMessageListenerContainer) {
// this includes DefaultMessageListenerContainer102
DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
if (cacheLevel >= 0) {
listenerContainer.setCacheLevel(cacheLevel);
} else if (cacheLevelName != null) {
listenerContainer.setCacheLevelName(cacheLevelName);
} else {
listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
}
if (idleTaskExecutionLimit >= 0) {
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
}
if (maxConcurrentConsumers >= 0) {
listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
}
if (maxMessagesPerTask >= 0) {
listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
}
listenerContainer.setPubSubNoLocal(pubSubNoLocal);
if (receiveTimeout >= 0) {
listenerContainer.setReceiveTimeout(receiveTimeout);
}
if (recoveryInterval >= 0) {
listenerContainer.setRecoveryInterval(recoveryInterval);
}
if (taskExecutor != null) {
listenerContainer.setTaskExecutor(taskExecutor);
}
PlatformTransactionManager tm = getTransactionManager();
if (tm != null) {
listenerContainer.setTransactionManager(tm);
} else if (transacted) {
throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
}
if (transactionName != null) {
listenerContainer.setTransactionName(transactionName);
}
if (transactionTimeout >= 0) {
listenerContainer.setTransactionTimeout(transactionTimeout);
}
} else if (container instanceof ServerSessionMessageListenerContainer) {
// this includes ServerSessionMessageListenerContainer102
ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer)container;
if (maxMessagesPerTask >= 0) {
listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
}
if (serverSessionFactory != null) {
listenerContainer.setServerSessionFactory(serverSessionFactory);
}
} else if (container instanceof SimpleMessageListenerContainer) {
// this includes SimpleMessageListenerContainer102
SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container;
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
listenerContainer.setPubSubNoLocal(pubSubNoLocal);
if (taskExecutor != null) {
listenerContainer.setTaskExecutor(taskExecutor);
}
}
}
|
protected void configuredQoS() {
if (explicitQosEnabled == null) {
explicitQosEnabled = true;
}
}
|
public JmsConfiguration copy() {
try {
return (JmsConfiguration)clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeCamelException(e);
}
}
Returns a copy of this configuration |
protected ConnectionFactory createConnectionFactory() {
ObjectHelper.notNull(connectionFactory, "connectionFactory");
return null;
}
Factory method which allows derived classes to customize the lazy
creation |
public static DestinationResolver createDestinationResolver(DestinationEndpoint destinationEndpoint) {
return new DestinationResolver() {
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
return destinationEndpoint.getJmsDestination(session);
}
};
}
|
public JmsOperations createInOnlyTemplate(JmsEndpoint endpoint,
boolean pubSubDomain,
String destination) {
if (jmsOperations != null) {
return jmsOperations;
}
ConnectionFactory factory = getTemplateConnectionFactory();
JmsTemplate template = useVersion102
? new CamelJmsTeemplate102(this, factory, pubSubDomain)
: new CamelJmsTemplate(this, factory);
template.setPubSubDomain(pubSubDomain);
if (destinationResolver != null) {
template.setDestinationResolver(destinationResolver);
if (endpoint instanceof DestinationEndpoint) {
LOG.debug("You are overloading the destinationResolver property on a DestinationEndpoint; are you sure you want to do that?");
}
} else if (endpoint instanceof DestinationEndpoint) {
DestinationEndpoint destinationEndpoint = (DestinationEndpoint) endpoint;
template.setDestinationResolver(createDestinationResolver(destinationEndpoint));
}
template.setDefaultDestinationName(destination);
template.setExplicitQosEnabled(isExplicitQosEnabled());
template.setDeliveryPersistent(deliveryPersistent);
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMessageIdEnabled(messageIdEnabled);
template.setMessageTimestampEnabled(messageTimestampEnabled);
if (priority >= 0) {
template.setPriority(priority);
}
template.setPubSubNoLocal(pubSubNoLocal);
if (receiveTimeout >= 0) {
template.setReceiveTimeout(receiveTimeout);
}
if (timeToLive >= 0) {
template.setTimeToLive(timeToLive);
}
template.setSessionTransacted(transacted);
if (transacted) {
template.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
} else {
// This is here for completeness, but the template should not get
// used
// for receiving messages.
if (acknowledgementMode >= 0) {
template.setSessionAcknowledgeMode(acknowledgementMode);
} else if (acknowledgementModeName != null) {
template.setSessionAcknowledgeModeName(acknowledgementModeName);
}
}
return template;
}
|
public JmsOperations createInOutTemplate(JmsEndpoint endpoint,
boolean pubSubDomain,
String destination,
long requestTimeout) {
JmsOperations answer = createInOnlyTemplate(endpoint, pubSubDomain, destination);
if (answer instanceof JmsTemplate && requestTimeout > 0) {
JmsTemplate jmsTemplate = (JmsTemplate)answer;
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setTimeToLive(requestTimeout);
jmsTemplate.setSessionTransacted(isTransactedInOut());
}
return answer;
}
Creates a JmsOperations object used for request/response using a request
timeout value |
protected ConnectionFactory createListenerConnectionFactory() {
return getConnectionFactory();
}
Factory method which allows derived classes to customize the lazy
creation |
public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
configureMessageListenerContainer(container, endpoint);
return container;
}
|
protected ConnectionFactory createTemplateConnectionFactory() {
return getConnectionFactory();
}
Factory method which allows derived classes to customize the lazy
creation |
protected int defaultCacheLevel(JmsEndpoint endpoint) {
// if we are on a new enough spring version we can assume CACHE_CONSUMER
if (PackageHelper.isValidVersion("org.springframework.jms", 2.51D)) {
return DefaultMessageListenerContainer.CACHE_CONSUMER;
} else {
if (endpoint.isPubSubDomain() && !isSubscriptionDurable()) {
// we must cache the consumer or we will miss messages
// see https://issues.apache.org/activemq/browse/CAMEL-253
return DefaultMessageListenerContainer.CACHE_CONSUMER;
} else {
// to enable consuming and sending with a single JMS session (to
// avoid XA) we can only use CACHE_CONNECTION
// due to this bug :
// http://opensource.atlassian.com/projects/spring/browse/SPR-3890
return DefaultMessageListenerContainer.CACHE_CONNECTION;
}
}
}
Defaults the JMS cache level if none is explicitly specified. Note that
due to this Spring
Bug we cannot use CACHE_CONSUMER by default (which we should do as
its most efficient) unless the spring version is 2.5.1 or later. Instead
we use CACHE_CONNECTION - part from for non-durable topics which must use
CACHE_CONSUMER to avoid missing messages (due to the consumer being
created and destroyed per message). |
public int getAcknowledgementMode() {
return acknowledgementMode;
}
|
public String getAcknowledgementModeName() {
return acknowledgementModeName;
}
|
public int getCacheLevel() {
return cacheLevel;
}
|
public String getCacheLevelName() {
return cacheLevelName;
}
|
public String getClientId() {
return clientId;
}
|
public int getConcurrentConsumers() {
return concurrentConsumers;
}
|
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = createConnectionFactory();
}
return connectionFactory;
}
|
public ConsumerType getConsumerType() {
return consumerType;
}
|
public DestinationResolver getDestinationResolver() {
return destinationResolver;
}
|
public String getDurableSubscriptionName() {
return durableSubscriptionName;
}
|
public ExceptionListener getExceptionListener() {
return exceptionListener;
}
|
public int getIdleTaskExecutionLimit() {
return idleTaskExecutionLimit;
}
|
public JmsOperations getJmsOperations() {
return jmsOperations;
}
|
public ConnectionFactory getListenerConnectionFactory() {
if (listenerConnectionFactory == null) {
listenerConnectionFactory = createListenerConnectionFactory();
}
return listenerConnectionFactory;
}
|
public int getMaxConcurrentConsumers() {
return maxConcurrentConsumers;
}
|
public int getMaxMessagesPerTask() {
return maxMessagesPerTask;
}
|
public MessageConverter getMessageConverter() {
return messageConverter;
}
|
public JmsOperations getMetadataJmsOperations(JmsEndpoint endpoint) {
if (metadataJmsOperations == null) {
metadataJmsOperations = getJmsOperations();
if (metadataJmsOperations == null) {
metadataJmsOperations = createInOnlyTemplate(endpoint, false, null);
}
}
return metadataJmsOperations;
}
|
public int getPriority() {
return priority;
}
|
public JmsProviderMetadata getProviderMetadata() {
return providerMetadata;
}
|
public long getReceiveTimeout() {
return receiveTimeout;
}
|
public long getRecoveryInterval() {
return recoveryInterval;
}
|
public String getReplyTo() {
return replyToDestination;
}
|
public String getReplyToDestinationSelectorName() {
return replyToDestinationSelectorName;
}
|
public String getReplyToTempDestinationAffinity() {
return replyToTempDestinationAffinity;
}
|
public long getRequestMapPurgePollTimeMillis() {
return requestMapPurgePollTimeMillis;
}
|
public long getRequestTimeout() {
return requestTimeout;
}
|
public ServerSessionFactory getServerSessionFactory() {
return serverSessionFactory;
}
|
public TaskExecutor getTaskExecutor() {
return taskExecutor;
}
|
public ConnectionFactory getTemplateConnectionFactory() {
if (templateConnectionFactory == null) {
templateConnectionFactory = createTemplateConnectionFactory();
}
return templateConnectionFactory;
}
|
public long getTimeToLive() {
return timeToLive;
}
|
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}
|
public String getTransactionName() {
return transactionName;
}
|
public int getTransactionTimeout() {
return transactionTimeout;
}
|
public boolean isAcceptMessagesWhileStopping() {
return acceptMessagesWhileStopping;
}
|
public boolean isAlwaysCopyMessage() {
return alwaysCopyMessage;
}
|
public boolean isAutoStartup() {
return autoStartup;
}
|
public boolean isDeliveryPersistent() {
return deliveryPersistent;
}
|
public boolean isDisableReplyTo() {
return disableReplyTo;
}
|
public boolean isEagerLoadingOfProperties() {
return eagerLoadingOfProperties;
}
|
public boolean isExplicitQosEnabled() {
return explicitQosEnabled != null ? explicitQosEnabled : false;
}
|
public boolean isExposeListenerSession() {
return exposeListenerSession;
}
|
public boolean isMessageIdEnabled() {
return messageIdEnabled;
}
|
public boolean isMessageTimestampEnabled() {
return messageTimestampEnabled;
}
|
public boolean isPreserveMessageQos() {
return preserveMessageQos;
}
|
public boolean isPubSubNoLocal() {
return pubSubNoLocal;
}
|
public boolean isReplyToDeliveryPersistent() {
return replyToDeliveryPersistent;
}
|
public boolean isSubscriptionDurable() {
return subscriptionDurable;
}
|
public boolean isTransacted() {
return transacted;
}
|
public boolean isTransactedInOut() {
return transactedInOut;
}
Should InOut operations (request reply) default to using transacted mode?
By default this is false as you need to commit the outgoing request before you can consume the input |
public boolean isUseMessageIDAsCorrelationID() {
return useMessageIDAsCorrelationID;
}
|
public boolean isUseVersion102() {
return useVersion102;
}
|
public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
}
|
public void setAcknowledgementMode(int consumerAcknowledgementMode) {
this.acknowledgementMode = consumerAcknowledgementMode;
this.acknowledgementModeName = null;
}
|
public void setAcknowledgementModeName(String consumerAcknowledgementMode) {
this.acknowledgementModeName = consumerAcknowledgementMode;
this.acknowledgementMode = -1;
}
|
public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
this.alwaysCopyMessage = alwaysCopyMessage;
}
|
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
|
public void setCacheLevel(int cacheLevel) {
this.cacheLevel = cacheLevel;
}
|
public void setCacheLevelName(String cacheName) {
this.cacheLevelName = cacheName;
}
|
public void setClientId(String consumerClientId) {
this.clientId = consumerClientId;
}
|
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
|
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
|
public void setConsumerType(ConsumerType consumerType) {
this.consumerType = consumerType;
}
|
public void setDeliveryPersistent(boolean deliveryPersistent) {
this.deliveryPersistent = deliveryPersistent;
configuredQoS();
}
|
public void setDestinationResolver(DestinationResolver destinationResolver) {
this.destinationResolver = destinationResolver;
}
|
public void setDisableReplyTo(boolean disableReplyTo) {
this.disableReplyTo = disableReplyTo;
}
Disables the use of the JMSReplyTo header for consumers so that inbound
messages are treated as InOnly rather than InOut requests. |
public void setDurableSubscriptionName(String durableSubscriptionName) {
this.durableSubscriptionName = durableSubscriptionName;
}
|
public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
this.eagerLoadingOfProperties = eagerLoadingOfProperties;
}
Enables eager loading of JMS properties as soon as a message is loaded
which generally is inefficient as the JMS properties may not be required
but sometimes can catch early any issues with the underlying JMS provider
and the use of JMS properties |
public void setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
}
|
public void setExplicitQosEnabled(boolean explicitQosEnabled) {
this.explicitQosEnabled = explicitQosEnabled;
}
|
public void setExposeListenerSession(boolean exposeListenerSession) {
this.exposeListenerSession = exposeListenerSession;
}
|
public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
this.idleTaskExecutionLimit = idleTaskExecutionLimit;
}
|
public void setJmsOperations(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
|
public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
this.listenerConnectionFactory = listenerConnectionFactory;
}
|
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
this.maxConcurrentConsumers = maxConcurrentConsumers;
}
|
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
|
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
|
public void setMessageIdEnabled(boolean messageIdEnabled) {
this.messageIdEnabled = messageIdEnabled;
}
|
public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
this.messageTimestampEnabled = messageTimestampEnabled;
}
|
public void setMetadataJmsOperations(JmsOperations metadataJmsOperations) {
this.metadataJmsOperations = metadataJmsOperations;
}
|
public void setPreserveMessageQos(boolean preserveMessageQos) {
this.preserveMessageQos = preserveMessageQos;
}
Set to true if you want to send message using the QoS settings specified
on the message. Normally the QoS settings used are the one configured on
this Object. |
public void setPriority(int priority) {
this.priority = priority;
configuredQoS();
}
|
public void setProviderMetadata(JmsProviderMetadata providerMetadata) {
this.providerMetadata = providerMetadata;
}
Allows the provider metadata to be explicitly configured. Typically this is not required
and Camel will auto-detect the provider metadata from the underlying provider. |
public void setPubSubNoLocal(boolean pubSubNoLocal) {
this.pubSubNoLocal = pubSubNoLocal;
}
|
public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
|
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
|
public void setReplyTo(String replyToDestination) {
if (!replyToDestination.startsWith(QUEUE_PREFIX)) {
throw new IllegalArgumentException("ReplyTo destination value has to be of type queue; "
+ "e.g: \"queue:replyQueue\"");
}
this.replyToDestination =
removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()), '/');
}
|
public void setReplyToDeliveryPersistent(boolean replyToDeliveryPersistent) {
this.replyToDeliveryPersistent = replyToDeliveryPersistent;
}
|
public void setReplyToDestinationSelectorName(String replyToDestinationSelectorName) {
this.replyToDestinationSelectorName = replyToDestinationSelectorName;
// in case of consumer - > producer and a named replyTo correlation selector
// message passthough is impossible as we need to set the value of selector into
// outgoing message, which would be read-only if passthough were to remain enabled
if (replyToDestinationSelectorName != null) {
setAlwaysCopyMessage(true);
}
}
|
public void setReplyToTempDestinationAffinity(String replyToTempDestinationAffinity) {
this.replyToTempDestinationAffinity = replyToTempDestinationAffinity;
}
|
public void setRequestMapPurgePollTimeMillis(long requestMapPurgePollTimeMillis) {
this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
}
Sets the frequency that the requestMap for InOut exchanges is purged for
timed out message exchanges |
public void setRequestTimeout(long requestTimeout) {
this.requestTimeout = requestTimeout;
}
Sets the timeout in milliseconds which requests should timeout after |
public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) {
this.serverSessionFactory = serverSessionFactory;
}
|
public void setSubscriptionDurable(boolean subscriptionDurable) {
this.subscriptionDurable = subscriptionDurable;
}
|
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
|
public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) {
this.templateConnectionFactory = templateConnectionFactory;
}
|
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
configuredQoS();
}
|
public void setTransacted(boolean consumerTransacted) {
this.transacted = consumerTransacted;
}
|
public void setTransactedInOut(boolean transactedInOut) {
this.transactedInOut = transactedInOut;
}
|
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
|
public void setTransactionName(String transactionName) {
this.transactionName = transactionName;
}
|
public void setTransactionTimeout(int transactionTimeout) {
this.transactionTimeout = transactionTimeout;
}
|
public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
}
|
public void setUseVersion102(boolean useVersion102) {
this.useVersion102 = useVersion102;
}
|