alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (DurableTopicSubscription.java)

This example ActiveMQ source code file (DurableTopicSubscription.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

concurrenthashmap, durabletopicsubscription, exception, exception, integer, io, ioexception, ioexception, iterator, jmsexception, messageack, messagereference, messagereference, threading, threads, topic, topic, util

The ActiveMQ DurableTopicSubscription.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {

    private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
    private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap();
    private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap();
    private final SubscriptionKey subscriptionKey;
    private final boolean keepDurableSubsActive;
    private AtomicBoolean active = new AtomicBoolean();

    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
        throws JMSException {
        super(broker,usageManager, context, info);
        this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
        this.pending.setSystemUsage(usageManager);
        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
        this.keepDurableSubsActive = keepDurableSubsActive;
        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
        
    }

    public final boolean isActive() {
        return active.get();
    }

    public boolean isFull() {
        return !active.get() || super.isFull();
    }

    public void gc() {
    }

    /**
     * store will have a pending ack for all durables, irrespective of the selector
     * so we need to ack if node is un-matched
     */
    public void unmatched(MessageReference node) throws IOException {
        MessageAck ack = new MessageAck();
        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
        ack.setMessageID(node.getMessageId());
        node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
    }

    @Override
    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
        // statically configured via maxPageSize
    }

    public void add(ConnectionContext context, Destination destination) throws Exception {
        super.add(context, destination);
        // do it just once per destination
        if (destinations.containsKey(destination.getActiveMQDestination())) {
            return;
        }
        destinations.put(destination.getActiveMQDestination(), destination);

        if (active.get() || keepDurableSubsActive) {
            Topic topic = (Topic)destination;
            topic.activate(context, this);
            if (pending.isEmpty(topic)) {
                topic.recoverRetroactiveMessages(context, this);
            }
            this.enqueueCounter+=pending.size();
        } else if (destination.getMessageStore() != null) {
            TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
            try {
                this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
            } catch (IOException e) {
                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
                jmsEx.setLinkedException(e);
                throw jmsEx;
            }
        }
        dispatchPending();
    }

    public void activate(SystemUsage memoryManager, ConnectionContext context,
            ConsumerInfo info) throws Exception {
        if (!active.get()) {
            this.context = context;
            this.info = info;
            LOG.debug("Activating " + this);
            if (!keepDurableSubsActive) {
                for (Iterator<Destination> iter = destinations.values()
                        .iterator(); iter.hasNext();) {
                    Topic topic = (Topic) iter.next();
                    add(context, topic);
                    topic.activate(context, this);
                }
            }
            synchronized (pendingLock) {
                pending.setSystemUsage(memoryManager);
                pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
                pending.setMaxAuditDepth(getMaxAuditDepth());
                pending.setMaxProducersToAudit(getMaxProducersToAudit());
                pending.start();
                // If nothing was in the persistent store, then try to use the
                // recovery policy.
                if (pending.isEmpty()) {
                    for (Iterator<Destination> iter = destinations.values()
                            .iterator(); iter.hasNext();) {
                        Topic topic = (Topic) iter.next();
                        topic.recoverRetroactiveMessages(context, this);
                    }
                }
            }
            this.active.set(true);
            dispatchPending();
            this.usageManager.getMemoryUsage().addUsageListener(this);
        }
    }

    public void deactivate(boolean keepDurableSubsActive) throws Exception {
        LOG.debug("Deactivating " + this);
        active.set(false);
        this.usageManager.getMemoryUsage().removeUsageListener(this);
        synchronized (pending) {
            pending.stop();
        }
        if (!keepDurableSubsActive) {
            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
                Topic topic = (Topic)iter.next();
                topic.deactivate(context, this);
            }
        }
        
        for (final MessageReference node : dispatched) {
            // Mark the dispatched messages as redelivered for next time.
            Integer count = redeliveredMessages.get(node.getMessageId());
            if (count != null) {
                redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
            } else {
                redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
            }
            if (keepDurableSubsActive&& pending.isTransient()) {
                synchronized (pending) {
                    pending.addMessageFirst(node);
                }
            } else {
                node.decrementReferenceCount();
            }
        }
        synchronized(dispatched) {
            dispatched.clear();
        }
        if (!keepDurableSubsActive && pending.isTransient()) {
            synchronized (pending) {
                try {
                    pending.reset();
                    while (pending.hasNext()) {
                        MessageReference node = pending.next();
                        node.decrementReferenceCount();
                        pending.remove();
                    }
                } finally {
                    pending.release();
                }
            }
        }
        prefetchExtension = 0;
    }

    
    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
        MessageDispatch md = super.createMessageDispatch(node, message);
        Integer count = redeliveredMessages.get(node.getMessageId());
        if (count != null) {
            md.setRedeliveryCounter(count.intValue());
        }
        return md;
    }

    public void add(MessageReference node) throws Exception {
        if (!active.get() && !keepDurableSubsActive) {
            return;
        }
        super.add(node);
    }

    protected void dispatchPending() throws IOException {
        if (isActive()) {
            super.dispatchPending();
        }
    }
    
    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
        synchronized(pending) {
            pending.addRecoveredMessage(message);
        }
    }

    public int getPendingQueueSize() {
        if (active.get() || keepDurableSubsActive) {
            return super.getPendingQueueSize();
        }
        // TODO: need to get from store
        return 0;
    }

    public void setSelector(String selector) throws InvalidSelectorException {
        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
    }

    protected boolean canDispatch(MessageReference node) {
        return isActive();
    }

    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
        node.getRegionDestination().acknowledge(context, this, ack, node);
        redeliveredMessages.remove(node.getMessageId());
        node.decrementReferenceCount();
    }

    
    public synchronized String toString() {
        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
               + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
    }

    public SubscriptionKey getSubscriptionKey() {
        return subscriptionKey;
    }

    /**
     * Release any references that we are holding.
     */
    public void destroy() {
        synchronized (pending) {
            try {

                pending.reset();
                while (pending.hasNext()) {
                    MessageReference node = pending.next();
                    node.decrementReferenceCount();
                }

            } finally {
                pending.release();
                pending.clear();
            }
        }
        synchronized(dispatched) {
            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
                MessageReference node = (MessageReference) iter.next();
                node.decrementReferenceCount();
            }
            dispatched.clear();
        }
        setSlowConsumer(false);
    }

    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
            try {
                dispatchPending();
            } catch (IOException e) {
                LOG.warn("problem calling dispatchMatched", e);
            }
        }
    }
    
    protected boolean isDropped(MessageReference node) {
       return false;
    }

    public boolean isKeepDurableSubsActive() {
        return keepDurableSubsActive;
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ DurableTopicSubscription.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.