alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (JDBCTopicMessageStore.java)

This example ActiveMQ source code file (JDBCTopicMessageStore.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

failed, failure, failure, io, ioexception, jdbc, jdbc, lastrecovered, lastrecoveredentry, lastrecoveredentry, reason, sql, sqlexception, string, string, threading, threads, transactioncontext, util

The ActiveMQ JDBCTopicMessageStore.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.store.jdbc;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 */
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {

    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap();

    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
        super(persistenceAdapter, adapter, wireFormat, topic, audit);
    }

    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
        if (ack != null && ack.isUnmatchedAck()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
            }
            return;
        }
        TransactionContext c = persistenceAdapter.getTransactionContext(context);
        try {
            long[] res = adapter.getStoreSequenceId(c, destination, messageId);
            if (this.isPrioritizedMessages()) {
                adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
            } else {
                adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
            }
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
        } finally {
            c.close();
        }
    }

    /**
     * @throws Exception
     */
    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                    msg.getMessageId().setBrokerSequenceId(sequenceId);
                    return listener.recoverMessage(msg);
                }

                public boolean recoverMessageReference(String reference) throws Exception {
                    return listener.recoverMessageReference(new MessageId(reference));
                }

            });
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
        } finally {
            c.close();
        }
    }

    private class LastRecovered implements Iterable<LastRecoveredEntry> {
        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
        LastRecovered() {
            for (int i=0; i<perPriority.length; i++) {
                perPriority[i] = new LastRecoveredEntry(i);
            }
        }

        public void updateStored(long sequence, int priority) {
            perPriority[priority].stored = sequence;
        }

        public LastRecoveredEntry defaultPriority() {
            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
        }

        public String toString() {
            return Arrays.deepToString(perPriority);
        }

        public Iterator<LastRecoveredEntry> iterator() {
            return new PriorityIterator();
        }

        class PriorityIterator implements Iterator<LastRecoveredEntry> {
            int current = 9;
            public boolean hasNext() {
                for (int i=current; i>=0; i--) {
                    if (perPriority[i].hasMessages()) {
                        current = i;
                        return true;
                    }
                }
                return false;
            }

            public LastRecoveredEntry next() {
                return perPriority[current];
            }

            public void remove() {
                throw new RuntimeException("not implemented");
            }
        }
    }

    private class LastRecoveredEntry {
        final int priority;
        long recovered = 0;
        long stored = Integer.MAX_VALUE;

        public LastRecoveredEntry(int priority) {
            this.priority = priority;
        }

        public String toString() {
            return priority + "-" + stored + ":" + recovered;
        }

        public void exhausted() {
            stored = recovered;
        }

        public boolean hasMessages() {
            return stored > recovered;
        }
    }

    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
        final MessageRecoveryListener delegate;
        final int maxMessages;
        LastRecoveredEntry lastRecovered;
        int recoveredCount;
        int recoveredMarker;

        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
            this.delegate = delegate;
            this.maxMessages = maxMessages;
        }

        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
            if (delegate.hasSpace()) {
                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
                msg.getMessageId().setBrokerSequenceId(sequenceId);
                if (delegate.recoverMessage(msg)) {
                    lastRecovered.recovered = sequenceId;
                    recoveredCount++;
                    return true;
                }
            }
            return false;
        }

        public boolean recoverMessageReference(String reference) throws Exception {
            return delegate.recoverMessageReference(new MessageId(reference));
        }

        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
            this.lastRecovered = lastRecovered;
            recoveredMarker = recoveredCount;
        }

        public boolean complete() {
            return  !delegate.hasSpace() || recoveredCount == maxMessages;
        }

        public boolean stalled() {
            return recoveredMarker == recoveredCount;
        }
    }

    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
            throws Exception {
        //Duration duration = new Duration("recoverNextMessages");
        TransactionContext c = persistenceAdapter.getTransactionContext();

        String key = getSubscriptionKey(clientId, subscriptionName);
        if (!subscriberLastRecoveredMap.containsKey(key)) {
           subscriberLastRecoveredMap.put(key, new LastRecovered());
        }
        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(key + " existing last recovered: " + lastRecovered);
            }
            if (isPrioritizedMessages()) {
                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
                    LastRecoveredEntry entry = it.next();
                    recoveredAwareListener.setLastRecovered(entry);
                    //Duration microDuration = new Duration("recoverNextMessages:loop");
                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
                    //microDuration.end(entry);
                    if (recoveredAwareListener.stalled()) {
                        if (recoveredAwareListener.complete()) {
                            break;
                        } else {
                            entry.exhausted();
                        }
                    }
                }
            } else {
                LastRecoveredEntry last = lastRecovered.defaultPriority();
                recoveredAwareListener.setLastRecovered(last);
                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
                        last.recovered, 0, maxReturned, recoveredAwareListener);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace(key + " last recovered: " + lastRecovered);
            }
            //duration.end();
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
        } finally {
            c.close();
        }
    }

    public void resetBatching(String clientId, String subscriptionName) {
        subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
    }

    protected void onAdd(long sequenceId, byte priority) {
        // update last recovered state
        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
            last.updateStored(sequenceId, priority);
        }
    }


    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            c = persistenceAdapter.getTransactionContext();
            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
        } finally {
            c.close();
        }
    }

    /**
     * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
     *      String)
     */
    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
        } finally {
            c.close();
        }
    }

    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
        } finally {
            c.close();
            resetBatching(clientId, subscriptionName);
        }
    }

    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            return adapter.doGetAllSubscriptions(c, destination);
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
        } finally {
            c.close();
        }
    }

    public int getMessageCount(String clientId, String subscriberName) throws IOException {
        //Duration duration = new Duration("getMessageCount");
        int result = 0;
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
        } finally {
            c.close();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
        }
        //duration.end();
        return result;
    }

    protected String getSubscriptionKey(String clientId, String subscriberName) {
        String result = clientId + ":";
        result += subscriberName != null ? subscriberName : "NOT_SET";
        return result;
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ JDBCTopicMessageStore.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.