|
Spring Framework example source code file (DefaultMessageListenerContainer.java)
The Spring Framework DefaultMessageListenerContainer.java source code/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.jms.listener;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.springframework.core.Constants;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.JmsException;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.CachingDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Message listener container variant that uses plain JMS client API, specifically
* a loop of <code>MessageConsumer.receive() calls that also allow for
* transactional reception of messages (registering them with XA transactions).
* Designed to work in a native JMS environment as well as in a J2EE environment,
* with only minimal differences in configuration.
*
* <p>NOTE: This class requires a JMS 1.1+ provider, because it builds on
* the domain-independent API. <b>Use the {@link DefaultMessageListenerContainer102}
* subclass for a JMS 1.0.2 provider, e.g. when running on a J2EE 1.3 server.</b>
*
* <p>This is a simple but nevertheless powerful form of message listener container.
* On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
* and optionally allows for dynamic adaptation at runtime (up until a maximum number).
* Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
* of runtime complexity, in particular the minimal requirements on the JMS provider:
* Not even the JMS ServerSessionPool facility is required. Beyond that, it is
* fully self-recovering in case of the broker being temporarily unavailable,
* and allows for stops/restarts as well as runtime changes to its configuration.
*
* <p>Actual MessageListener execution happens in asynchronous work units which are
* created through Spring's {@link org.springframework.core.task.TaskExecutor}
* abstraction. By default, the specified number of invoker tasks will be created
* on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
* setting. Specify an alternative TaskExecutor to integrate with an existing
* thread pool facility (such as a J2EE server's), for example using a
* {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
* With a native JMS setup, each of those listener threads is going to use a
* cached JMS Session and MessageConsumer (only refreshed in case of failure),
* using the JMS provider's resources as efficiently as possible.
*
* <p>Message reception and listener execution can automatically be wrapped
* in transactions through passing a Spring
* {@link org.springframework.transaction.PlatformTransactionManager} into the
* {@link #setTransactionManager "transactionManager"} property. This will usually
* be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
* J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
* from JNDI (check your J2EE server's documentation). Note that this listener
* container will automatically reobtain all JMS handles for each transaction
* in case of an external transaction manager specified, for compatibility with
* all J2EE servers (in particular JBoss). This non-caching behavior can be
* overridden through the {@link #setCacheLevel "cacheLevel"} /
* {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching
* of the Connection (or also Session and MessageConsumer) even in case of
* an external transaction manager being involved.
*
* <p>Dynamic scaling of the number of concurrent invokers can be activated
* through specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
* value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
* value. Since the latter's default is 1, you can also simply specify a
* "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
* 5 concurrent consumers in case of increasing message load, as well as dynamic
* shrinking back to the standard number of consumers once the load decreases.
* Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
* setting to control the lifespan of each new task, to avoid frequent scaling up
* and down, in particular if the ConnectionFactory does not pool JMS Sessions
* and/or the TaskExecutor does not pool threads (check your configuration!).
* Note that dynamic scaling only really makes sense for a queue in the first
* place; for a topic, you will typically stick with the default number of 1
* consumer, else you'd receive the same message multiple times on the same node.
*
* <p>It is strongly recommended to either set {@link #setSessionTransacted
* "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
* "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
* javadoc for details on acknowledge modes and native transaction options,
* as well as the {@link AbstractPollingMessageListenerContainer} javadoc
* for details on configuring an external transaction manager.
*
* @author Juergen Hoeller
* @since 2.0
* @see #setTransactionManager
* @see #setCacheLevel
* @see javax.jms.MessageConsumer#receive(long)
* @see SimpleMessageListenerContainer
* @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
*/
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
/**
* Default thread name prefix: "DefaultMessageListenerContainer-".
*/
public static final String DEFAULT_THREAD_NAME_PREFIX =
ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
/**
* The default recovery interval: 5000 ms = 5 seconds.
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
/**
* Constant that indicates to cache no JMS resources at all.
* @see #setCacheLevel
*/
public static final int CACHE_NONE = 0;
/**
* Constant that indicates to cache a shared JMS Connection.
* @see #setCacheLevel
*/
public static final int CACHE_CONNECTION = 1;
/**
* Constant that indicates to cache a shared JMS Connection
* and a JMS Session for each listener thread.
* @see #setCacheLevel
*/
public static final int CACHE_SESSION = 2;
/**
* Constant that indicates to cache a shared JMS Connection
* and a JMS Session for each listener thread, as well as
* a JMS MessageConsumer for each listener thread.
* @see #setCacheLevel
*/
public static final int CACHE_CONSUMER = 3;
/**
* Constant that indicates automatic choice of an appropriate
* caching level (depending on the transaction management strategy).
* @see #setCacheLevel
*/
public static final int CACHE_AUTO = 4;
private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
private TaskExecutor taskExecutor;
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
private int cacheLevel = CACHE_AUTO;
private int concurrentConsumers = 1;
private int maxConcurrentConsumers = 1;
private int maxMessagesPerTask = Integer.MIN_VALUE;
private int idleTaskExecutionLimit = 1;
private final Set scheduledInvokers = new HashSet();
private int activeInvokerCount = 0;
private Runnable stopCallback;
private final Object activeInvokerMonitor = new Object();
private Object currentRecoveryMarker = new Object();
private final Object recoveryMonitor = new Object();
/**
* Set the Spring TaskExecutor to use for running the listener threads.
* <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
* starting up a number of new threads, according to the specified number
* of concurrent consumers.
* <p>Specify an alternative TaskExecutor for integration with an existing
* thread pool. Note that this really only adds value if the threads are
* managed in a specific fashion, for example within a J2EE environment.
* A plain thread pool does not add much value, as this listener container
* will occupy a number of threads for its entire lifetime.
* @see #setConcurrentConsumers
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Specify the interval between recovery attempts, in <b>milliseconds.
* The default is 5000 ms, that is, 5 seconds.
* @see #handleListenerSetupFailure
*/
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
/**
* Specify the level of caching that this listener container is allowed to apply,
* in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
* @see #setCacheLevel
*/
public void setCacheLevelName(String constantName) throws IllegalArgumentException {
if (constantName == null || !constantName.startsWith("CACHE_")) {
throw new IllegalArgumentException("Only cache constants allowed");
}
setCacheLevel(constants.asNumber(constantName).intValue());
}
/**
* Specify the level of caching that this listener container is allowed to apply.
* <p>Default is CACHE_NONE if an external transaction manager has been specified
* (to reobtain all resources freshly within the scope of the external transaction),
* and CACHE_CONSUMER else (operating with local JMS resources).
* <p>Some J2EE servers only register their JMS resources with an ongoing XA
* transaction in case of a freshly obtained JMS Connection and Session,
* which is why this listener container does by default not cache any of those.
* However, if you want to optimize for a specific server, consider switching
* this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
* conjunction with an external transaction manager.
* <p>Currently known servers that absolutely require CACHE_NONE for XA
* transaction processing: JBoss 4. For any others, consider raising the
* cache level.
* @see #CACHE_NONE
* @see #CACHE_CONNECTION
* @see #CACHE_SESSION
* @see #CACHE_CONSUMER
* @see #setCacheLevelName
* @see #setTransactionManager
*/
public void setCacheLevel(int cacheLevel) {
this.cacheLevel = cacheLevel;
}
/**
* Return the level of caching that this listener container is allowed to apply.
*/
public int getCacheLevel() {
return this.cacheLevel;
}
/**
* Specify the number of concurrent consumers to create. Default is 1.
* <p>Specifying a higher value for this setting will increase the standard
* level of scheduled concurrent consumers at runtime: This is effectively
* the minimum number of concurrent consumers which will be scheduled
* at any given time. This is a static setting; for dynamic scaling,
* consider specifying the "maxConcurrentConsumers" setting instead.
* <p>Raising the number of concurrent consumers is recommendable in order
* to scale the consumption of messages coming in from a queue. However,
* note that any ordering guarantees are lost once multiple consumers are
* registered. In general, stick with 1 consumer for low-volume queues.
* <p>Do not raise the number of concurrent consumers for a topic.
* This would lead to concurrent consumption of the same message,
* which is hardly ever desirable.
* <p>This setting can be modified at runtime, for example through JMX.
* @see #setMaxConcurrentConsumers
*/
public void setConcurrentConsumers(int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
synchronized (this.activeInvokerMonitor) {
this.concurrentConsumers = concurrentConsumers;
if (this.maxConcurrentConsumers < concurrentConsumers) {
this.maxConcurrentConsumers = concurrentConsumers;
}
}
}
/**
* Return the "concurrentConsumer" setting.
* <p>This returns the currently configured "concurrentConsumers" value;
* the number of currently scheduled/active consumers might differ.
* @see #getScheduledConsumerCount()
* @see #getActiveConsumerCount()
*/
public final int getConcurrentConsumers() {
synchronized (this.activeInvokerMonitor) {
return this.concurrentConsumers;
}
}
/**
* Specify the maximum number of concurrent consumers to create. Default is 1.
* <p>If this setting is higher than "concurrentConsumers", the listener container
* will dynamically schedule new consumers at runtime, provided that enough
* incoming messages are encountered. Once the load goes down again, the number of
* consumers will be reduced to the standard level ("concurrentConsumers") again.
* <p>Raising the number of concurrent consumers is recommendable in order
* to scale the consumption of messages coming in from a queue. However,
* note that any ordering guarantees are lost once multiple consumers are
* registered. In general, stick with 1 consumer for low-volume queues.
* <p>Do not raise the number of concurrent consumers for a topic.
* This would lead to concurrent consumption of the same message,
* which is hardly ever desirable.
* <p>This setting can be modified at runtime, for example through JMX.
* @see #setConcurrentConsumers
*/
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
synchronized (this.activeInvokerMonitor) {
this.maxConcurrentConsumers =
(maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers);
}
}
/**
* Return the "maxConcurrentConsumer" setting.
* <p>This returns the currently configured "maxConcurrentConsumers" value;
* the number of currently scheduled/active consumers might differ.
* @see #getScheduledConsumerCount()
* @see #getActiveConsumerCount()
*/
public final int getMaxConcurrentConsumers() {
synchronized (this.activeInvokerMonitor) {
return this.maxConcurrentConsumers;
}
}
/**
* Specify the maximum number of messages to process in one task.
* More concretely, this limits the number of message reception attempts
* per task, which includes receive iterations that did not actually
* pick up a message until they hit their timeout (see the
* {@link #setReceiveTimeout "receiveTimeout"} property).
* <p>Default is unlimited (-1) in case of a standard TaskExecutor,
* reusing the original invoker threads until shutdown (at the
* expense of limited dynamic scheduling).
* <p>In case of a SchedulingTaskExecutor indicating a preference for
* short-lived tasks, the default is 10 instead. Specify a number
* of 10 to 100 messages to balance between rather long-lived and
* rather short-lived tasks here.
* <p>Long-lived tasks avoid frequent thread context switches through
* sticking with the same thread all the way through, while short-lived
* tasks allow thread pools to control the scheduling. Hence, thread
* pools will usually prefer short-lived tasks.
* <p>This setting can be modified at runtime, for example through JMX.
* @see #setTaskExecutor
* @see #setReceiveTimeout
* @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
synchronized (this.activeInvokerMonitor) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
}
/**
* Return the maximum number of messages to process in one task.
*/
public int getMaxMessagesPerTask() {
synchronized (this.activeInvokerMonitor) {
return this.maxMessagesPerTask;
}
}
/**
* Specify the limit for idle executions of a receive task, not having
* received any message within its execution. If this limit is reached,
* the task will shut down and leave receiving to other executing tasks.
* <p>Default is 1, closing idle resources early once a task didn't
* receive a message. This applies to dynamic scheduling only; see the
* {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting.
* The minimum number of consumers
* (see {@link #setConcurrentConsumers "concurrentConsumers"})
* will be kept around until shutdown in any case.
* <p>Within each task execution, a number of message reception attempts
* (according to the "maxMessagesPerTask" setting) will each wait for an incoming
* message (according to the "receiveTimeout" setting). If all of those receive
* attempts in a given task return without a message, the task is considered
* idle with respect to received messages. Such a task may still be rescheduled;
* however, once it reached the specified "idleTaskExecutionLimit", it will
* shut down (in case of dynamic scaling).
* <p>Raise this limit if you encounter too frequent scaling up and down.
* With this limit being higher, an idle consumer will be kept around longer,
* avoiding the restart of a consumer once a new load of messages comes in.
* Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value,
* which will also lead to idle consumers being kept around for a longer time
* (while also increasing the average execution time of each scheduled task).
* <p>This setting can be modified at runtime, for example through JMX.
* @see #setMaxMessagesPerTask
* @see #setReceiveTimeout
*/
public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
synchronized (this.activeInvokerMonitor) {
this.idleTaskExecutionLimit = idleTaskExecutionLimit;
}
}
/**
* Return the limit for idle executions of a receive task.
*/
public int getIdleTaskExecutionLimit() {
synchronized (this.activeInvokerMonitor) {
return this.idleTaskExecutionLimit;
}
}
protected void validateConfiguration() {
super.validateConfiguration();
synchronized (this.activeInvokerMonitor) {
if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
}
}
}
//-------------------------------------------------------------------------
// Implementation of AbstractMessageListenerContainer's template methods
//-------------------------------------------------------------------------
public void initialize() {
// Adapt default cache level.
if (this.cacheLevel == CACHE_AUTO) {
this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
}
// Prepare taskExecutor and maxMessagesPerTask.
synchronized (this.activeInvokerMonitor) {
if (this.taskExecutor == null) {
this.taskExecutor = createDefaultTaskExecutor();
}
else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
this.maxMessagesPerTask == Integer.MIN_VALUE) {
// TaskExecutor indicated a preference for short-lived tasks. According to
// setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
// unless the user specified a custom value.
this.maxMessagesPerTask = 10;
}
}
// Proceed with actual listener initialization.
super.initialize();
}
/**
* Creates the specified number of concurrent consumers,
* in the form of a JMS Session plus associated MessageConsumer
* running in a separate thread.
* @see #scheduleNewInvoker
* @see #setTaskExecutor
*/
protected void doInitialize() throws JMSException {
synchronized (this.activeInvokerMonitor) {
for (int i = 0; i < this.concurrentConsumers; i++) {
scheduleNewInvoker();
}
}
}
/**
* Destroy the registered JMS Sessions and associated MessageConsumers.
*/
protected void doShutdown() throws JMSException {
logger.debug("Waiting for shutdown of message listener invokers");
try {
synchronized (this.activeInvokerMonitor) {
while (this.activeInvokerCount > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers");
}
this.activeInvokerMonitor.wait();
}
}
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
}
/**
* Overridden to reset the stop callback, if any.
*/
public void start() throws JmsException {
synchronized (this.activeInvokerMonitor) {
this.stopCallback = null;
}
super.start();
}
/**
* Stop this listener container, invoking the specific callback
* once all listener processing has actually stopped.
* <p>Note: Further
Other Spring Framework examples (source code examples)Here is a short list of links related to this Spring Framework DefaultMessageListenerContainer.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.