alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (Queue.java)

This example ActiveMQ source code file (Queue.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqdestination, arraylist, exception, exception, io, list, list, message, messagereference, messagereference, override, queuemessagereference, queuemessagereference, string, subscription, threading, threads, util

The ActiveMQ Queue.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.*;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
 * The Queue is a List of MessageEntry objects that are dispatched to matching
 * subscriptions.
 * 
 * 
 */
public class Queue extends BaseDestination implements Task, UsageListener {
    protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
    protected final TaskRunnerFactory taskFactory;
    protected TaskRunner taskRunner;
    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
    protected final List<Subscription> consumers = new ArrayList(50);
    private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
    protected PendingMessageCursor messages;
    private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
    private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap();
    // Messages that are paged in but have not yet been targeted at a
    // subscription
    private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
    private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList(100);
    private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList();
    private MessageGroupMap messageGroupOwners;
    private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
    private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
    final Lock sendLock = new ReentrantLock();
    private ExecutorService executor;
    protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
            .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
    private boolean useConsumerPriority = true;
    private boolean strictOrderDispatch = false;
    private final QueueDispatchSelector dispatchSelector;
    private boolean optimizedDispatch = false;
    private boolean firstConsumer = false;
    private int timeBeforeDispatchStarts = 0;
    private int consumersBeforeDispatchStarts = 0;
    private CountDownLatch consumersBeforeStartsLatch;
    private final AtomicLong pendingWakeups = new AtomicLong();
    private boolean allConsumersExclusiveByDefault = false;
    
    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
        public void run() {
            asyncWakeup();
        }
    };
    private final Runnable expireMessagesTask = new Runnable() {
        public void run() {
            expireMessages();
        }
    };

    private final Object iteratingMutex = new Object() {
    };
    private final Scheduler scheduler;

    class TimeoutMessage implements Delayed {

        Message message;
        ConnectionContext context;
        long trigger;

        public TimeoutMessage(Message message, ConnectionContext context, long delay) {
            this.message = message;
            this.context = context;
            this.trigger = System.currentTimeMillis() + delay;
        }

        public long getDelay(TimeUnit unit) {
            long n = trigger - System.currentTimeMillis();
            return unit.convert(n, TimeUnit.MILLISECONDS);
        }

        public int compareTo(Delayed delayed) {
            long other = ((TimeoutMessage) delayed).trigger;
            int returnValue;
            if (this.trigger < other) {
                returnValue = -1;
            } else if (this.trigger > other) {
                returnValue = 1;
            } else {
                returnValue = 0;
            }
            return returnValue;
        }

    }

    DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue();

    class FlowControlTimeoutTask extends Thread {

        @Override
        public void run() {
            TimeoutMessage timeout;
            try {
                while (true) {
                    timeout = flowControlTimeoutMessages.take();
                    if (timeout != null) {
                        synchronized (messagesWaitingForSpace) {
                            if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
                                ExceptionResponse response = new ExceptionResponse(
                                        new ResourceAllocationException(
                                                "Usage Manager Memory Limit reached. Stopping producer ("
                                                        + timeout.message.getProducerId()
                                                        + ") to prevent flooding "
                                                        + getActiveMQDestination().getQualifiedName()
                                                        + "."
                                                        + " See http://activemq.apache.org/producer-flow-control.html for more info"));
                                response.setCorrelationId(timeout.message.getCommandId());
                                timeout.context.getConnection().dispatchAsync(response);
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
                }
            }
        }
    };

    private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();

    private static final Comparator<Subscription> orderedCompare = new Comparator() {

        public int compare(Subscription s1, Subscription s2) {
            // We want the list sorted in descending order
            return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
        }
    };

    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
        super(brokerService, store, destination, parentStats);
        this.taskFactory = taskFactory;
        this.dispatchSelector = new QueueDispatchSelector(destination);
        this.scheduler = brokerService.getBroker().getScheduler();
    }

    public List<Subscription> getConsumers() {
        consumersLock.readLock().lock();
        try {
            return new ArrayList<Subscription>(consumers);
        }finally {
            consumersLock.readLock().unlock();
        }
    }

    // make the queue easily visible in the debugger from its task runner
    // threads
    final class QueueThread extends Thread {
        final Queue queue;

        public QueueThread(Runnable runnable, String name, Queue queue) {
            super(runnable, name);
            this.queue = queue;
        }
    }

    @Override
    public void initialize() throws Exception {
        if (this.messages == null) {
            if (destination.isTemporary() || broker == null || store == null) {
                this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
            } else {
                this.messages = new StoreQueueCursor(broker, this);
            }
        }
        // If a VMPendingMessageCursor don't use the default Producer System
        // Usage
        // since it turns into a shared blocking queue which can lead to a
        // network deadlock.
        // If we are cursoring to disk..it's not and issue because it does not
        // block due
        // to large disk sizes.
        if (messages instanceof VMPendingMessageCursor) {
            this.systemUsage = brokerService.getSystemUsage();
            memoryUsage.setParent(systemUsage.getMemoryUsage());
        }

        this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());

        super.initialize();
        if (store != null) {
            // Restore the persistent messages.
            messages.setSystemUsage(systemUsage);
            messages.setEnableAudit(isEnableAudit());
            messages.setMaxAuditDepth(getMaxAuditDepth());
            messages.setMaxProducersToAudit(getMaxProducersToAudit());
            messages.setUseCache(isUseCache());
            messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
            if (messages.isRecoveryRequired()) {
                store.recover(new MessageRecoveryListener() {
                    double totalMessageCount = store.getMessageCount();
                    int recoveredMessageCount = 0;

                    public boolean recoverMessage(Message message) {
                        // Message could have expired while it was being
                        // loaded..
                        if ((++recoveredMessageCount % 50000) == 0) {
                            LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
                                    + recoveredMessageCount + " messages. " +
                                    (int)(recoveredMessageCount*100/totalMessageCount) + "% complete");
                        }
                        if (message.isExpired()) {
                            if (broker.isExpired(message)) {
                                messageExpired(createConnectionContext(), createMessageReference(message));
                                // drop message will decrement so counter
                                // balance here
                                destinationStatistics.getMessages().increment();
                            }
                            return true;
                        }
                        if (hasSpace()) {
                            message.setRegionDestination(Queue.this);
                            messagesLock.writeLock().lock();
                            try{
                                try {
                                    messages.addMessageLast(message);
                                } catch (Exception e) {
                                    LOG.error("Failed to add message to cursor", e);
                                }
                            }finally {
                                messagesLock.writeLock().unlock();
                            }
                            destinationStatistics.getMessages().increment();
                            return true;
                        }
                        return false;
                    }

                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                        throw new RuntimeException("Should not be called.");
                    }

                    public boolean hasSpace() {
                        return true;
                    }

                    public boolean isDuplicate(MessageId id) {
                        return false;
                    }
                });
            } else {
                int messageCount = store.getMessageCount();
                destinationStatistics.getMessages().setCount(messageCount);
            }
        }
    }

    /*
     * Holder for subscription that needs attention on next iterate browser
     * needs access to existing messages in the queue that have already been
     * dispatched
     */
    class BrowserDispatch {
        QueueBrowserSubscription browser;

        public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
            browser = browserSubscription;
            browser.incrementQueueRef();
        }

        void done() {
            try {
                browser.decrementQueueRef();
            } catch (Exception e) {
                LOG.warn("decrement ref on browser: " + browser, e);
            }
        }

        public QueueBrowserSubscription getBrowser() {
            return browser;
        }
    }

    LinkedList<BrowserDispatch> browserDispatches = new LinkedList();

    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
                    + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
                    + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
                    + getDestinationStatistics().getInflight().getCount());
        }

        super.addSubscription(context, sub);
        // synchronize with dispatch method so that no new messages are sent
        // while setting up a subscription. avoid out of order messages,
        // duplicates, etc.
        pagedInPendingDispatchLock.writeLock().lock();
        try {

            sub.add(context, this);
           
            // needs to be synchronized - so no contention with dispatching
           // consumersLock.
            consumersLock.writeLock().lock();
            try {

                // set a flag if this is a first consumer
                if (consumers.size() == 0) {
                    firstConsumer = true;
                    if (consumersBeforeDispatchStarts != 0) {
                        consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
                    }
                } else {
                    if (consumersBeforeStartsLatch != null) {
                        consumersBeforeStartsLatch.countDown();
                    }
                }

                addToConsumerList(sub);
                if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
                    if (exclusiveConsumer == null) {
                        exclusiveConsumer = sub;
                    } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
                        sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
                        exclusiveConsumer = sub;
                    }
                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                }
            }finally {
                consumersLock.writeLock().unlock();
            }

            if (sub instanceof QueueBrowserSubscription) {
                // tee up for dispatch in next iterate
                QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
                pagedInMessagesLock.readLock().lock();
                try{
                    BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
                    browserDispatches.addLast(browserDispatch);
                }finally {
                    pagedInMessagesLock.readLock().unlock();
                }
            }

            if (!(this.optimizedDispatch || isSlave())) {
                wakeup();
            }
        }finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }
        if (this.optimizedDispatch || isSlave()) {
            // Outside of dispatchLock() to maintain the lock hierarchy of
            // iteratingMutex -> dispatchLock. - see
            // https://issues.apache.org/activemq/browse/AMQ-1878
            wakeup();
        }
    }

    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
            throws Exception {
        super.removeSubscription(context, sub, lastDeiveredSequenceId);
        // synchronize with dispatch method so that no new messages are sent
        // while removing up a subscription.
        pagedInPendingDispatchLock.writeLock().lock();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
                        + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
                        + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
                        + getDestinationStatistics().getInflight().getCount());
            }
            consumersLock.writeLock().lock();
            try {
                removeFromConsumerList(sub);
                if (sub.getConsumerInfo().isExclusive()) {
                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
                    if (exclusiveConsumer == sub) {
                        exclusiveConsumer = null;
                        for (Subscription s : consumers) {
                            if (s.getConsumerInfo().isExclusive()
                                    && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
                                            .getConsumerInfo().getPriority())) {
                                exclusiveConsumer = s;

                            }
                        }
                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                    }
                } else if (isAllConsumersExclusiveByDefault()) {
                    Subscription exclusiveConsumer = null;
                    for (Subscription s : consumers) {
                        if (exclusiveConsumer == null 
                                || s.getConsumerInfo().getPriority() > exclusiveConsumer
                                .getConsumerInfo().getPriority()) {
                            exclusiveConsumer = s;
                                }
                    }
                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                }
                ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
                getMessageGroupOwners().removeConsumer(consumerId);

                // redeliver inflight messages

                boolean markAsRedelivered = false;
                MessageReference lastDeliveredRef = null;
                List<MessageReference> unAckedMessages = sub.remove(context, this);

                // locate last redelivered in unconsumed list (list in delivery rather than seq order)
                if (lastDeiveredSequenceId != 0) {
                    for (MessageReference ref : unAckedMessages) {
                        if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
                            lastDeliveredRef = ref;
                            markAsRedelivered = true;
                            LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
                            break;
                        }
                    }
                }
                for (MessageReference ref : unAckedMessages) {
                    QueueMessageReference qmr = (QueueMessageReference) ref;
                    if (qmr.getLockOwner() == sub) {
                        qmr.unlock();

                        // have no delivery information
                        if (lastDeiveredSequenceId == 0) {
                            qmr.incrementRedeliveryCounter();
                        } else {
                            if (markAsRedelivered) {
                                qmr.incrementRedeliveryCounter();
                            }
                            if (ref == lastDeliveredRef) {
                                // all that follow were not redelivered
                                markAsRedelivered = false;
                            }
                        }
                    }
                    redeliveredWaitingDispatch.add(qmr);
                }
                if (!redeliveredWaitingDispatch.isEmpty()) {
                    doDispatch(new ArrayList<QueueMessageReference>());
                }
            }finally {
                consumersLock.writeLock().unlock();
            }
            if (!(this.optimizedDispatch || isSlave())) {
                wakeup();
            }
        }finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }
        if (this.optimizedDispatch || isSlave()) {
            // Outside of dispatchLock() to maintain the lock hierarchy of
            // iteratingMutex -> dispatchLock. - see
            // https://issues.apache.org/activemq/browse/AMQ-1878
            wakeup();
        }
    }

    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
        final ConnectionContext context = producerExchange.getConnectionContext();
        // There is delay between the client sending it and it arriving at the
        // destination.. it may have expired.
        message.setRegionDestination(this);
        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
                && !context.isInRecoveryMode();
        if (message.isExpired()) {
            // message not stored - or added to stats yet - so chuck here
            broker.getRoot().messageExpired(context, message, null);
            if (sendProducerAck) {
                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                context.getConnection().dispatchAsync(ack);
            }
            return;
        }
        if (memoryUsage.isFull()) {
            isFull(context, memoryUsage);
            fastProducer(context, producerInfo);
            if (isProducerFlowControl() && context.isProducerFlowControl()) {
                if (warnOnProducerFlowControl) {
                    warnOnProducerFlowControl = false;
                    LOG
                            .info("Usage Manager Memory Limit ("
                                    + memoryUsage.getLimit()
                                    + ") reached on "
                                    + getActiveMQDestination().getQualifiedName()
                                    + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
                }

                if (systemUsage.isSendFailIfNoSpace()) {
                    throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
                            + message.getProducerId() + ") to prevent flooding "
                            + getActiveMQDestination().getQualifiedName() + "."
                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
                }

                // We can avoid blocking due to low usage if the producer is
                // sending
                // a sync message or if it is using a producer window
                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
                    // copy the exchange state since the context will be
                    // modified while we are waiting
                    // for space.
                    final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
                    synchronized (messagesWaitingForSpace) {
                     // Start flow control timeout task
                        // Prevent trying to start it multiple times
                        if (!flowControlTimeoutTask.isAlive()) {
                            flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
                            flowControlTimeoutTask.start();
                        }
                        messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
                            public void run() {

                                try {
                                    // While waiting for space to free up... the
                                    // message may have expired.
                                    if (message.isExpired()) {
                                        LOG.error("expired waiting for space..");
                                        broker.messageExpired(context, message, null);
                                        destinationStatistics.getExpired().increment();
                                    } else {
                                        doMessageSend(producerExchangeCopy, message);
                                    }

                                    if (sendProducerAck) {
                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
                                                .getSize());
                                        context.getConnection().dispatchAsync(ack);
                                    } else {
                                        Response response = new Response();
                                        response.setCorrelationId(message.getCommandId());
                                        context.getConnection().dispatchAsync(response);
                                    }

                                } catch (Exception e) {
                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
                                        ExceptionResponse response = new ExceptionResponse(e);
                                        response.setCorrelationId(message.getCommandId());
                                        context.getConnection().dispatchAsync(response);
                                    } else {
                                        LOG.debug("unexpected exception on deferred send of :" + message, e);
                                    }
                                }
                            }
                        });

                        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
                            flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
                                    .getSendFailIfNoSpaceAfterTimeout()));
                        }

                        registerCallbackForNotFullNotification();
                        context.setDontSendReponse(true);
                        return;
                    }

                } else {

                    if (memoryUsage.isFull()) {
                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
                    }

                    // The usage manager could have delayed us by the time
                    // we unblock the message could have expired..
                    if (message.isExpired()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Expired message: " + message);
                        }
                        broker.getRoot().messageExpired(context, message, null);
                        return;
                    }
                }
            }
        }
        doMessageSend(producerExchange, message);
        if (sendProducerAck) {
            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
            context.getConnection().dispatchAsync(ack);
        }
    }

    private void registerCallbackForNotFullNotification() {
        // If the usage manager is not full, then the task will not
        // get called..
        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
            // so call it directly here.
            sendMessagesWaitingForSpaceTask.run();
        }
    }

    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
            Exception {
        final ConnectionContext context = producerExchange.getConnectionContext();
        Future<Object> result = null;
        
        checkUsage(context, message);
        sendLock.lockInterruptibly();
        try {
            if (store != null && message.isPersistent()) {        
                message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                if (messages.isCacheEnabled()) {
                    result = store.asyncAddQueueMessage(context, message);
                } else {
                    store.addMessage(context, message);
                }
                if (isReduceMemoryFootprint()) {
                    message.clearMarshalledState();
                }
            }
            if (context.isInTransaction()) {
                // If this is a transacted message.. increase the usage now so that
                // a big TX does not blow up
                // our memory. This increment is decremented once the tx finishes..
                message.incrementReferenceCount();
            
                context.getTransaction().addSynchronization(new Synchronization() {
                    @Override
                    public void afterCommit() throws Exception {
                        sendLock.lockInterruptibly();
                        try {
                            // It could take while before we receive the commit
                            // op, by that time the message could have expired..
                            if (broker.isExpired(message)) {
                                broker.messageExpired(context, message, null);
                                destinationStatistics.getExpired().increment();
                                return;
                            }
                            sendMessage(message);
                        } finally {
                            sendLock.unlock();
                            message.decrementReferenceCount();
                        }
                        messageSent(context, message);
                    }
                    @Override
                    public void afterRollback() throws Exception {
                        message.decrementReferenceCount();
                    }
                });
            } else {
                // Add to the pending list, this takes care of incrementing the
                // usage manager.
                sendMessage(message);
            }
        } finally {
            sendLock.unlock();
        }
        if (!context.isInTransaction()) {
            messageSent(context, message);
        }
        if (result != null && !result.isCancelled()) {
            try {
                result.get();
            } catch (CancellationException e) {
                // ignore - the task has been cancelled if the message
                // has already been deleted
            }
        }
    }

    private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
        if (message.isPersistent()) {
            if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
                    + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
                    + message.getProducerId() + ") to prevent flooding "
                    + getActiveMQDestination().getQualifiedName() + "."
                    + " See http://activemq.apache.org/producer-flow-control.html for more info";

                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
            }
        } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
            final String logMessage = "Usage Manager Temp Store is Full ("
                    + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() 
                    +"). Stopping producer (" + message.getProducerId()
                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                + " See http://activemq.apache.org/producer-flow-control.html for more info";
            
            waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
        }
    }

    private void expireMessages() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages ..");
        }

        // just track the insertion count
        List<Message> browsedMessages = new AbstractList() {
            int size = 0;

            @Override
            public void add(int index, Message element) {
                size++;
            }

            @Override
            public int size() {
                return size;
            }

            @Override
            public Message get(int index) {
                return null;
            }
        };
        doBrowse(browsedMessages, this.getMaxExpirePageSize());
        asyncWakeup();
        if (LOG.isDebugEnabled()) {
            LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done.");
        }
    }

    public void gc() {
    }

    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
            throws IOException {
        messageConsumed(context, node);
        if (store != null && node.isPersistent()) {
            // the original ack may be a ranged ack, but we are trying to delete
            // a specific
            // message store here so we need to convert to a non ranged ack.
            if (ack.getMessageCount() > 0) {
                // Dup the ack
                MessageAck a = new MessageAck();
                ack.copy(a);
                ack = a;
                // Convert to non-ranged.
                ack.setFirstMessageId(node.getMessageId());
                ack.setLastMessageId(node.getMessageId());
                ack.setMessageCount(1);
            }

            store.removeAsyncMessage(context, ack);
        }
    }

    Message loadMessage(MessageId messageId) throws IOException {
        Message msg = null;
        if (store != null) { // can be null for a temp q
            msg = store.getMessage(messageId);
            if (msg != null) {
                msg.setRegionDestination(this);
            }
        }
        return msg;
    }

    @Override
    public String toString() {
        int size = 0;
        messagesLock.readLock().lock();
        try{
            size = messages.size();
        }finally {
            messagesLock.readLock().unlock();
        }
        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
                + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
                + messageGroupOwners;
    }

    public void start() throws Exception {
        if (memoryUsage != null) {
            memoryUsage.start();
        }
        if (systemUsage.getStoreUsage() != null) {
            systemUsage.getStoreUsage().start();
        }
        systemUsage.getMemoryUsage().addUsageListener(this);
        messages.start();
        if (getExpireMessagesPeriod() > 0) {
            scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
        }
        doPageIn(false);
    }

    public void stop() throws Exception {
        if (taskRunner != null) {
            taskRunner.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdownNow();
        }

        scheduler.cancel(expireMessagesTask);

        if (flowControlTimeoutTask.isAlive()) {
            flowControlTimeoutTask.interrupt();
        }

        if (messages != null) {
            messages.stop();
        }

        systemUsage.getMemoryUsage().removeUsageListener(this);
        if (memoryUsage != null) {
            memoryUsage.stop();
        }
        if (store != null) {
            store.stop();
        }
    }

    // Properties
    // -------------------------------------------------------------------------
    @Override
    public ActiveMQDestination getActiveMQDestination() {
        return destination;
    }

    public MessageGroupMap getMessageGroupOwners() {
        if (messageGroupOwners == null) {
            messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
        }
        return messageGroupOwners;
    }

    public DispatchPolicy getDispatchPolicy() {
        return dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public MessageGroupMapFactory getMessageGroupMapFactory() {
        return messageGroupMapFactory;
    }

    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
        this.messageGroupMapFactory = messageGroupMapFactory;
    }

    public PendingMessageCursor getMessages() {
        return this.messages;
    }

    public void setMessages(PendingMessageCursor messages) {
        this.messages = messages;
    }

    public boolean isUseConsumerPriority() {
        return useConsumerPriority;
    }

    public void setUseConsumerPriority(boolean useConsumerPriority) {
        this.useConsumerPriority = useConsumerPriority;
    }

    public boolean isStrictOrderDispatch() {
        return strictOrderDispatch;
    }

    public void setStrictOrderDispatch(boolean strictOrderDispatch) {
        this.strictOrderDispatch = strictOrderDispatch;
    }

    public boolean isOptimizedDispatch() {
        return optimizedDispatch;
    }

    public void setOptimizedDispatch(boolean optimizedDispatch) {
        this.optimizedDispatch = optimizedDispatch;
    }

    public int getTimeBeforeDispatchStarts() {
        return timeBeforeDispatchStarts;
    }

    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
    }

    public int getConsumersBeforeDispatchStarts() {
        return consumersBeforeDispatchStarts;
    }

    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
    }

    public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
        this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
    }

    public boolean isAllConsumersExclusiveByDefault() {
        return allConsumersExclusiveByDefault;
    }


    // Implementation methods
    // -------------------------------------------------------------------------
    private QueueMessageReference createMessageReference(Message message) {
        QueueMessageReference result = new IndirectMessageReference(message);
        return result;
    }

    public Message[] browse() {
        List<Message> browseList = new ArrayList();
        doBrowse(browseList, getMaxBrowsePageSize());
        return browseList.toArray(new Message[browseList.size()]);
    }

    public void doBrowse(List<Message> browseList, int max) {
        final ConnectionContext connectionContext = createConnectionContext();
        try {
            pageInMessages(false);
            List<MessageReference> toExpire = new ArrayList();

            pagedInPendingDispatchLock.writeLock().lock();
            try {
                addAll(pagedInPendingDispatch, browseList, max, toExpire);
                for (MessageReference ref : toExpire) {
                    pagedInPendingDispatch.remove(ref);
                    if (broker.isExpired(ref)) {
                        LOG.debug("expiring from pagedInPending: " + ref);
                        messageExpired(connectionContext, ref);
                    }
                }
            } finally {
                pagedInPendingDispatchLock.writeLock().unlock();
            }
            toExpire.clear();
            pagedInMessagesLock.readLock().lock();
            try {
                addAll(pagedInMessages.values(), browseList, max, toExpire);
            } finally {
                pagedInMessagesLock.readLock().unlock();
            }
            for (MessageReference ref : toExpire) {
                if (broker.isExpired(ref)) {
                    LOG.debug("expiring from pagedInMessages: " + ref);
                    messageExpired(connectionContext, ref);
                } else {
                    pagedInMessagesLock.writeLock().lock();
                    try {
                        pagedInMessages.remove(ref.getMessageId());
                    } finally {
                        pagedInMessagesLock.writeLock().unlock();
                    }
                }
            }

            if (browseList.size() < getMaxBrowsePageSize()) {
                messagesLock.writeLock().lock();
                try {
                    try {
                        messages.reset();
                        while (messages.hasNext() && browseList.size() < max) {
                            MessageReference node = messages.next();
                            if (node.isExpired()) {
                                if (broker.isExpired(node)) {
                                    LOG.debug("expiring from messages: " + node);
                                    messageExpired(connectionContext, createMessageReference(node.getMessage()));
                                }
                                messages.remove();
                            } else {
                                messages.rollback(node.getMessageId());
                                if (browseList.contains(node.getMessage()) == false) {
                                    browseList.add(node.getMessage());
                                }
                            }
                            node.decrementReferenceCount();
                        }
                    } finally {
                        messages.release();
                    }
                } finally {
                    messagesLock.writeLock().unlock();
                }
            }

        } catch (Exception e) {
            LOG.error("Problem retrieving message for browse", e);
        }
    }

    private void addAll(Collection<QueueMessageReference> refs, List l, int maxBrowsePageSize,
            List<MessageReference> toExpire) throws Exception {
        for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
            QueueMessageReference ref = i.next();
            if (ref.isExpired()) {
                toExpire.add(ref);
            } else if (l.contains(ref.getMessage()) == false) {
                l.add(ref.getMessage());
            }
        }
    }

    public QueueMessageReference getMessage(String id) {
        MessageId msgId = new MessageId(id);
        pagedInMessagesLock.readLock().lock();
        try{
            QueueMessageReference ref = this.pagedInMessages.get(msgId);
            if (ref != null) {
                return ref;
            }
        }finally {
            pagedInMessagesLock.readLock().unlock();
        }
        messagesLock.readLock().lock();
        try{
            try {
                messages.reset();
                while (messages.hasNext()) {
                    MessageReference mr = messages.next();
                    QueueMessageReference qmr = createMessageReference(mr.getMessage());
                    qmr.decrementReferenceCount();
                    messages.rollback(qmr.getMessageId());
                    if (msgId.equals(qmr.getMessageId())) {
                        return qmr;
                    }
                }
            } finally {
                messages.release();
            }
        }finally {
            messagesLock.readLock().unlock();
        }
        return null;
    }

    public void purge() throws Exception {
        ConnectionContext c = createConnectionContext();
        List<MessageReference> list = null;
        do {
            doPageIn(true);
            pagedInMessagesLock.readLock().lock();
            try {
                list = new ArrayList<MessageReference>(pagedInMessages.values());
            }finally {
                pagedInMessagesLock.readLock().unlock();
            }

            for (MessageReference ref : list) {
                try {
                    QueueMessageReference r = (QueueMessageReference) ref;
                    removeMessage(c, r);
                } catch (IOException e) {
                }
            }
            // don't spin/hang if stats are out and there is nothing left in the
            // store
        } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
        if (this.destinationStatistics.getMessages().getCount() > 0) {
            LOG.warn(getActiveMQDestination().getQualifiedName()
                    + " after purge complete, message count stats report: "
                    + this.destinationStatistics.getMessages().getCount());
        }
        gc();
        this.destinationStatistics.getMessages().setCount(0);
        getMessages().clear();
    }

    /**
     * Removes the message matching the given messageId
     */
    public boolean removeMessage(String messageId) throws Exception {
        return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
    }

    /**
     * Removes the messages matching the given selector
     * 
     * @return the number of messages removed
     */
    public int removeMatchingMessages(String selector) throws Exception {
        return removeMatchingMessages(selector, -1);
    }

    /**
     * Removes the messages matching the given selector up to the maximum number
     * of matched messages
     * 
     * @return the number of messages removed
     */
    public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
        return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
    }

    /**
     * Removes the messages matching the given filter up to the maximum number
     * of matched messages
     * 
     * @return the number of messages removed
     */
    public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
        int movedCounter = 0;
        Set<MessageReference> set = new CopyOnWriteArraySet();
        ConnectionContext context = createConnectionContext();
        do {
            doPageIn(true);
            pagedInMessagesLock.readLock().lock();
            try{
                set.addAll(pagedInMessages.values());
            }finally {
                pagedInMessagesLock.readLock().unlock();
            }
            List<MessageReference> list = new ArrayList(set);
            for (MessageReference ref : list) {
                IndirectMessageReference r = (IndirectMessageReference) ref;
                if (filter.evaluate(context, r)) {

                    removeMessage(context, r);
                    set.remove(r);
                    if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                        return movedCounter;
                    }
                }
            }
        } while (set.size() < this.destinationStatistics.getMessages().getCount());
        return movedCounter;
    }

    /**
     * Copies the message matching the given messageId
     */
    public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
            throws Exception {
        return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
    }

    /**
     * Copies the messages matching the given selector
     * 
     * @return the number of messages copied
     */
    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
            throws Exception {
        return copyMatchingMessagesTo(context, selector, dest, -1);
    }

    /**
     * Copies the messages matching the given selector up to the maximum number
     * of matched messages
     * 
     * @return the number of messages copied
     */
    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
            int maximumMessages) throws Exception {
        return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
    }

    /**
     * Copies the messages matching the given filter up to the maximum number of
     * matched messages
     * 
     * @return the number of messages copied
     */
    public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
            int maximumMessages) throws Exception {
        int movedCounter = 0;
        int count = 0;
        Set<MessageReference> set = new CopyOnWriteArraySet();
        do {
            int oldMaxSize = getMaxPageSize();
            setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
            doPageIn(true);
            setMaxPageSize(oldMaxSize);
            pagedInMessagesLock.readLock().lock();
            try {
                set.addAll(pagedInMessages.values());
            }finally {
                pagedInMessagesLock.readLock().unlock();
            }
            List<MessageReference> list = new ArrayList(set);
            for (MessageReference ref : list) {
                IndirectMessageReference r = (IndirectMessageReference) ref;
                if (filter.evaluate(context, r)) {

                    r.incrementReferenceCount();
                    try {
                        Message m = r.getMessage();
                        BrokerSupport.resend(context, m, dest);
                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                            return movedCounter;
                        }
                    } finally {
                        r.decrementReferenceCount();
                    }
                }
                count++;
            }
        } while (count < this.destinationStatistics.getMessages().getCount());
        return movedCounter;
    }

    /**
     * Move a message
     *
     * @param context
     *            connection context
     * @param m
     *            QueueMessageReference
     * @param dest
     *            ActiveMQDestination
     * @throws Exception
     */
    public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
        BrokerSupport.resend(context, m.getMessage(), dest);
        removeMessage(context, m);
        messagesLock.writeLock().lock();
        try{
            messages.rollback(m.getMessageId());
        }finally {
            messagesLock.writeLock().unlock();
        }
        return true;
    }

    /**
     * Moves the message matching the given messageId
     */
    public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
            throws Exception {
        return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
    }

    /**
     * Moves the messages matching the given selector
     * 
     * @return the number of messages removed
     */
    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
            throws Exception {
        return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
    }

    /**
     * Moves the messages matching the given selector up to the maximum number
     * of matched messages
     */
    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
            int maximumMessages) throws Exception {
        return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
    }

    /**
     * Moves the messages matching the given filter up to the maximum number of
     * matched messages
     */
    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
            ActiveMQDestination dest, int maximumMessages) throws Exception {
        int movedCounter = 0;
        Set<QueueMessageReference> set = new CopyOnWriteArraySet();
        do {
            doPageIn(true);
            pagedInMessagesLock.readLock().lock();
            try{
                set.addAll(pagedInMessages.values());
            }finally {
                pagedInMessagesLock.readLock().unlock();
            }
            List<QueueMessageReference> list = new ArrayList(set);
            for (QueueMessageReference ref : list) {
                if (filter.evaluate(context, ref)) {
                    // We should only move messages that can be locked.
                    moveMessageTo(context, ref, dest);
                    set.remove(ref);
                    if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                        return movedCounter;
                    }
                }
            }
        } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
        return movedCounter;
    }

    BrowserDispatch getNextBrowserDispatch() {
        pagedInMessagesLock.readLock().lock();
        try{
            if (browserDispatches.isEmpty()) {
                return null;
            }
            return browserDispatches.removeFirst();
        }finally {
            pagedInMessagesLock.readLock().unlock();
        }

    }

    /**
     * @return true if we would like to iterate again
     * @see org.apache.activemq.thread.Task#iterate()
     */
    public boolean iterate() {
        MDC.put("activemq.destination", getName());
        boolean pageInMoreMessages = false;
        synchronized (iteratingMutex) {

            // do early to allow dispatch of these waiting messages
            synchronized (messagesWaitingForSpace) {
                Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
                while (it.hasNext()) {
                    if (!memoryUsage.isFull()) {
                        Runnable op = it.next();
                        it.remove();
                        op.run();
                    } else {
                        registerCallbackForNotFullNotification();
                        break;
                    }
                }
            }

            if (firstConsumer) {
                firstConsumer = false;
                try {
                    if (consumersBeforeDispatchStarts > 0) {
                        int timeout = 1000; // wait one second by default if
                                            // consumer count isn't reached
                        if (timeBeforeDispatchStarts > 0) {
                            timeout = timeBeforeDispatchStarts;
                        }
                        if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
                            }
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(timeout + " ms elapsed and " + consumers.size()
                                        + " consumers subscribed. Starting dispatch.");
                            }
                        }
                    }
                    if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
                        iteratingMutex.wait(timeBeforeDispatchStarts);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
                        }
                    }
                } catch (Exception e) {
                    LOG.error(e.toString());
                }
            }

            BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();

            messagesLock.readLock().lock();
            try{
                pageInMoreMessages |= !messages.isEmpty();
            }finally {
                messagesLock.readLock().unlock();
            }

            pagedInPendingDispatchLock.readLock().lock();
            try {
                pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
            }finally {
                pagedInPendingDispatchLock.readLock().unlock();
            }

            // Perhaps we should page always into the pagedInPendingDispatch
            // list if
            // !messages.isEmpty(), and then if
            // !pagedInPendingDispatch.isEmpty()
            // then we do a dispatch.
            if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
                try {
                    pageInMessages(pendingBrowserDispatch != null);

                } catch (Throwable e) {
                    LOG.error("Failed to page in more queue messages ", e);
                }
            }

            if (pendingBrowserDispatch != null) {
                ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
                pagedInMessagesLock.readLock().lock();
                try{
                    alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
                }finally {
                    pagedInMessagesLock.readLock().unlock();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
                            + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
                }
                do {
                    try {
                        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
                        msgContext.setDestination(destination);

                        QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
                        for (QueueMessageReference node : alreadyDispatchedMessages) {
                            if (!node.isAcked()) {
                                msgContext.setMessageReference(node);
                                if (browser.matches(node, msgContext)) {
                                    browser.add(node);
                                }
                            }
                        }
                        pendingBrowserDispatch.done();
                    } catch (Exception e) {
                        LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
                    }

                } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
            }

            if (pendingWakeups.get() > 0) {
                pendingWakeups.decrementAndGet();
            }
            MDC.remove("activemq.destination");
            return pendingWakeups.get() > 0;
        }
    }

    protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
        return new MessageReferenceFilter() {
            public boolean evaluate(ConnectionContext context, MessageReference r) {
                return messageId.equals(r.getMessageId().toString());
            }

            @Override
            public String toString() {
                return "MessageIdFilter: " + messageId;
            }
        };
    }

    protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
        final BooleanExpression selectorExpression = SelectorParser.parse(selector);

        return new MessageReferenceFilter() {
            public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
                MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();

                messageEvaluationContext.setMessageReference(r);
                if (messageEvaluationContext.getDestination() == null) {
                    messageEvaluationContext.setDestination(getActiveMQDestination());
                }

                return selectorExpression.matches(messageEvaluationContext);
            }
        };
    }

    protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
        removeMessage(c, null, r);
        pagedInPendingDispatchLock.writeLock().lock();
        try {
            pagedInPendingDispatch.remove(r);
        } finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }

    }

    protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
        MessageAck ack = new MessageAck();
        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
        ack.setDestination(destination);
        ack.setMessageID(r.getMessageId());
        removeMessage(c, subs, r, ack);
    }

    protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
            MessageAck ack) throws IOException {
        reference.setAcked(true);
        // This sends the ack the the journal..
        if (!ack.isInTransaction()) {
            acknowledge(context, sub, ack, reference);
            getDestinationStatistics().getDequeues().increment();
            dropMessage(reference);
        } else {
            try {
                acknowledge(context, sub, ack, reference);
            } finally {
                context.getTransaction().addSynchronization(new Synchronization() {

                    @Override
                    public void afterCommit() throws Exception {
                        getDestinationStatistics().getDequeues().increment();
                        dropMessage(reference);
                        wakeup();
                    }

                    @Override
                    public void afterRollback() throws Exception {
                        reference.setAcked(false);
                    }
                });
            }
        }
        if (ack.isPoisonAck()) {
            // message gone to DLQ, is ok to allow redelivery
            messagesLock.writeLock().lock();
            try{
                messages.rollback(reference.getMessageId());
            }finally {
                messagesLock.writeLock().unlock();
            }
        }

    }

    private void dropMessage(QueueMessageReference reference) {
        reference.drop();
        destinationStatistics.getMessages().decrement();
        pagedInMessagesLock.writeLock().lock();
        try{
            pagedInMessages.remove(reference.getMessageId());
        }finally {
            pagedInMessagesLock.writeLock().unlock();
        }
    }

    public void messageExpired(ConnectionContext context, MessageReference reference) {
        messageExpired(context, null, reference);
    }

    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("message expired: " + reference);
        }
        broker.messageExpired(context, reference, subs);
        destinationStatistics.getExpired().increment();
        try {
            removeMessage(context, subs, (QueueMessageReference) reference);
        } catch (IOException e) {
            LOG.error("Failed to remove expired Message from the store ", e);
        }
    }

    protected ConnectionContext createConnectionContext() {
        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
        answer.setBroker(this.broker);
        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
        return answer;
    }

    final void sendMessage(final Message msg) throws Exception {
        messagesLock.writeLock().lock();
        try{
            messages.addMessageLast(msg);
        }finally {
            messagesLock.writeLock().unlock();
        }
    }
    
    final void messageSent(final ConnectionContext context, final Message msg) throws Exception {     
        destinationStatistics.getEnqueues().increment();
        destinationStatistics.getMessages().increment();
        messageDelivered(context, msg);
        consumersLock.readLock().lock();
        try {
            if (consumers.isEmpty()) {
                onMessageWithNoConsumers(context, msg);
            }
        }finally {
            consumersLock.readLock().unlock();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Message " + msg.getMessageId() + " sent to " + this.destination);
        }
        wakeup();
    }

    public void wakeup() {
        if (optimizedDispatch || isSlave()) {
            iterate();
            pendingWakeups.incrementAndGet();
        } else {
            asyncWakeup();
        }
    }

    private void asyncWakeup() {
        try {
            pendingWakeups.incrementAndGet();
            this.taskRunner.wakeup();
        } catch (InterruptedException e) {
            LOG.warn("Async task tunner failed to wakeup ", e);
        }
    }

    private boolean isSlave() {
        return broker.getBrokerService().isSlave();
    }

    private void doPageIn(boolean force) throws Exception {
        List<QueueMessageReference> newlyPaged = doPageInForDispatch(force);
        pagedInPendingDispatchLock.writeLock().lock();
        try {
            if (pagedInPendingDispatch.isEmpty()) {
                pagedInPendingDispatch.addAll(newlyPaged);
            } else {
                for (QueueMessageReference qmr : newlyPaged) {
                    if (!pagedInPendingDispatch.contains(qmr)) {
                        pagedInPendingDispatch.add(qmr);
                    }
                }
            }
        } finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }
    }

    private List<QueueMessageReference> doPageInForDispatch(boolean force) throws Exception {
        List<QueueMessageReference> result = null;
        List<QueueMessageReference> resultList = null;

        int toPageIn = Math.min(getMaxPageSize(), messages.size());
        if (LOG.isDebugEnabled()) {
            LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
                    + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
                    + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
                    + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
        }

        if (isLazyDispatch() && !force) {
            // Only page in the minimum number of messages which can be
            // dispatched immediately.
            toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
        }
        int pagedInPendingSize = 0;
        pagedInPendingDispatchLock.readLock().lock();
        try {
            pagedInPendingSize = pagedInPendingDispatch.size();
        } finally {
            pagedInPendingDispatchLock.readLock().unlock();
        }
        if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
            int count = 0;
            result = new ArrayList<QueueMessageReference>(toPageIn);
            messagesLock.writeLock().lock();
            try {
                try {
                    messages.setMaxBatchSize(toPageIn);
                    messages.reset();
                    while (messages.hasNext() && count < toPageIn) {
                        MessageReference node = messages.next();
                        messages.remove();

                        QueueMessageReference ref = createMessageReference(node.getMessage());
                        if (ref.isExpired()) {
                            if (broker.isExpired(ref)) {
                                messageExpired(createConnectionContext(), ref);
                            } else {
                                ref.decrementReferenceCount();
                            }
                        } else {
                            result.add(ref);
                            count++;
                        }
                    }
                } finally {
                    messages.release();
                }
            } finally {
                messagesLock.writeLock().unlock();
            }
            // Only add new messages, not already pagedIn to avoid multiple
            // dispatch attempts
            pagedInMessagesLock.writeLock().lock();
            try {
                resultList = new ArrayList<QueueMessageReference>(result.size());
                for (QueueMessageReference ref : result) {
                    if (!pagedInMessages.containsKey(ref.getMessageId())) {            
                        pagedInMessages.put(ref.getMessageId(), ref);
                        resultList.add(ref);
                    } else {
                        ref.decrementReferenceCount();
                    }
                }
            } finally {
                pagedInMessagesLock.writeLock().unlock();
            }
        } else {
            // Avoid return null list, if condition is not validated
            resultList = new ArrayList<QueueMessageReference>();
        }

        return resultList;
    }

    private void doDispatch(List<QueueMessageReference> list) throws Exception {
        boolean doWakeUp = false;

        pagedInPendingDispatchLock.writeLock().lock();
        try {
            if (!redeliveredWaitingDispatch.isEmpty()) {
                // Try first to dispatch redelivered messages to keep an
                // proper order
                redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
            }
            if (!pagedInPendingDispatch.isEmpty()) {
                // Next dispatch anything that had not been
                // dispatched before.
                pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
            }
            // and now see if we can dispatch the new stuff.. and append to
            // the pending
            // list anything that does not actually get dispatched.
            if (list != null && !list.isEmpty()) {
                if (pagedInPendingDispatch.isEmpty()) {
                    pagedInPendingDispatch.addAll(doActualDispatch(list));
                } else {
                    for (QueueMessageReference qmr : list) {
                        if (!pagedInPendingDispatch.contains(qmr)) {
                            pagedInPendingDispatch.add(qmr);
                        }
                    }
                    doWakeUp = true;
                }
            }
        } finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }

        if (doWakeUp) {
            // avoid lock order contention
            asyncWakeup();
        }
    }

    /**
     * @return list of messages that could get dispatched to consumers if they
     *         were not full.
     */
    private List<QueueMessageReference> doActualDispatch(List list) throws Exception {
        List<Subscription> consumers;
        consumersLock.writeLock().lock();
        try {
            if (this.consumers.isEmpty() || isSlave()) {
                // slave dispatch happens in processDispatchNotification
                return list;
            }
            consumers = new ArrayList<Subscription>(this.consumers);
        }finally {
            consumersLock.writeLock().unlock();
        }

        List<QueueMessageReference> rc = new ArrayList(list.size());
        Set<Subscription> fullConsumers = new HashSet(this.consumers.size());

        for (MessageReference node : list) {
            Subscription target = null;
            int interestCount = 0;
            for (Subscription s : consumers) {
                if (s instanceof QueueBrowserSubscription) {
                    interestCount++;
                    continue;
                }
                if (dispatchSelector.canSelect(s, node)) {
                    if (!fullConsumers.contains(s)) {
                        if (!s.isFull()) {
                            if (assignMessageGroup(s, (QueueMessageReference)node)) {
                                // Dispatch it.
                                s.add(node);
                                target = s;
                                break;
                            }
                        } else {
                            // no further dispatch of list to a full consumer to
                            // avoid out of order message receipt
                            fullConsumers.add(s);
                        }
                    }
                    interestCount++;
                } else {
                    // makes sure it gets dispatched again
                    if (!node.isDropped() && !((QueueMessageReference) node).isAcked()
                            && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
                        interestCount++;
                    }
                }
            }

            if ((target == null && interestCount > 0) || consumers.size() == 0) {
                // This means all subs were full or that there are no
                // consumers...
                rc.add((QueueMessageReference) node);
            }

            // If it got dispatched, rotate the consumer list to get round robin
            // distribution.
            if (target != null && !strictOrderDispatch && consumers.size() > 1
                    && !dispatchSelector.isExclusiveConsumer(target)) {
                consumersLock.writeLock().lock();
                try {
                    if (removeFromConsumerList(target)) {
                        addToConsumerList(target);
                        consumers = new ArrayList<Subscription>(this.consumers);
                    }
                }finally {
                    consumersLock.writeLock().unlock();
                }
            }
        }

        return rc;
    }

    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
        //QueueMessageReference node = (QueueMessageReference) m;
        boolean result = true;
        // Keep message groups together.
        String groupId = node.getGroupID();
        int sequence = node.getGroupSequence();
        if (groupId != null) {
            //MessageGroupMap messageGroupOwners = ((Queue) node
            //        .getRegionDestination()).getMessageGroupOwners();

            MessageGroupMap messageGroupOwners = getMessageGroupOwners();
            // If we can own the first, then no-one else should own the
            // rest.
            if (sequence == 1) {
                assignGroup(subscription, messageGroupOwners, node, groupId);
            } else {

                // Make sure that the previous owner is still valid, we may
                // need to become the new owner.
                ConsumerId groupOwner;

                groupOwner = messageGroupOwners.get(groupId);
                if (groupOwner == null) {
                    assignGroup(subscription, messageGroupOwners, node, groupId);
                } else {
                    if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
                        // A group sequence < 1 is an end of group signal.
                        if (sequence < 0) {
                            messageGroupOwners.removeGroup(groupId);
                        }
                    } else {
                        result = false;
                    }
                }
            }
        }

        return result;

    }

    protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
        messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
        Message message = n.getMessage();
        if (message instanceof ActiveMQMessage) {
            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
            try {
                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
            } catch (JMSException e) {
                LOG.warn("Failed to set boolean header: " + e, e);
            }
        }
    }

    protected void pageInMessages(boolean force) throws Exception {
        doDispatch(doPageInForDispatch(force));
    }

    private void addToConsumerList(Subscription sub) {
        if (useConsumerPriority) {
            consumers.add(sub);
            Collections.sort(consumers, orderedCompare);
        } else {
            consumers.add(sub);
        }
    }

    private boolean removeFromConsumerList(Subscription sub) {
        return consumers.remove(sub);
    }

    private int getConsumerMessageCountBeforeFull() throws Exception {
        int total = 0;
        boolean zeroPrefetch = false;
        consumersLock.readLock().lock();
        try{
            for (Subscription s : consumers) {
                zeroPrefetch |= s.getPrefetchSize() == 0;
                int countBeforeFull = s.countBeforeFull();
                total += countBeforeFull;
            }
        }finally {
            consumersLock.readLock().unlock();
        }
        if (total == 0 && zeroPrefetch) {
            total = 1;
        }
        return total;
    }

    /*
     * In slave mode, dispatch is ignored till we get this notification as the
     * dispatch process is non deterministic between master and slave. On a
     * notification, the actual dispatch to the subscription (as chosen by the
     * master) is completed. (non-Javadoc)
     * @see
     * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
     * (org.apache.activemq.command.MessageDispatchNotification)
     */
    @Override
    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        // do dispatch
        Subscription sub = getMatchingSubscription(messageDispatchNotification);
        if (sub != null) {
            MessageReference message = getMatchingMessage(messageDispatchNotification);
            sub.add(message);
            sub.processMessageDispatchNotification(messageDispatchNotification);
        }
    }

    private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
            throws Exception {
        QueueMessageReference message = null;
        MessageId messageId = messageDispatchNotification.getMessageId();

        pagedInPendingDispatchLock.writeLock().lock();
        try {
            for (QueueMessageReference ref : pagedInPendingDispatch) {
                if (messageId.equals(ref.getMessageId())) {
                    message = ref;
                    pagedInPendingDispatch.remove(ref);
                    break;
                }
            }
        } finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }

        if (message == null) {
            pagedInMessagesLock.readLock().lock();
            try {
                message = pagedInMessages.get(messageId);
            } finally {
                pagedInMessagesLock.readLock().unlock();
            }
        }

        if (message == null) {
            messagesLock.writeLock().lock();
            try {
                try {
                    messages.setMaxBatchSize(getMaxPageSize());
                    messages.reset();
                    while (messages.hasNext()) {
                        MessageReference node = messages.next();
                        messages.remove();
                        if (messageId.equals(node.getMessageId())) {
                            message = this.createMessageReference(node.getMessage());
                            break;
                        }
                    }
                } finally {
                    messages.release();
                }
            } finally {
                messagesLock.writeLock().unlock();
            }
        }

        if (message == null) {
            Message msg = loadMessage(messageId);
            if (msg != null) {
                message = this.createMessageReference(msg);
            }
        }

        if (message == null) {
            throw new JMSException("Slave broker out of sync with master - Message: "
                    + messageDispatchNotification.getMessageId() + " on "
                    + messageDispatchNotification.getDestination() + " does not exist among pending("
                    + pagedInPendingDispatch.size() + ") for subscription: "
                    + messageDispatchNotification.getConsumerId());
        }
        return message;
    }

    /**
     * Find a consumer that matches the id in the message dispatch notification
     * 
     * @param messageDispatchNotification
     * @return sub or null if the subscription has been removed before dispatch
     * @throws JMSException
     */
    private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
            throws JMSException {
        Subscription sub = null;
        consumersLock.readLock().lock();
        try {
            for (Subscription s : consumers) {
                if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
                    sub = s;
                    break;
                }
            }
        }finally {
            consumersLock.readLock().unlock();
        }
        return sub;
    }

    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
        if (oldPercentUsage > newPercentUsage) {
            asyncWakeup();
        }
    }

    @Override
    protected Logger getLog() {
        return LOG;
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ Queue.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.