|
ActiveMQ example source code file (AMQTopicMessageStore.java)
The ActiveMQ AMQTopicMessageStore.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.store.amq; import java.io.IOException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicReferenceStore; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A MessageStore that uses a Journal to store it's messages. * * */ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore { private static final Logger LOG = LoggerFactory.getLogger(AMQTopicMessageStore.class); private TopicReferenceStore topicReferenceStore; public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { super(adapter, topicReferenceStore, destinationName); this.topicReferenceStore = topicReferenceStore; } public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { flush(); topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); } public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, final MessageRecoveryListener listener) throws Exception { RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener); if (recoveryListener.size() == 0) { flush(); topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener); } } public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { return topicReferenceStore.lookupSubscription(clientId, subscriptionName); } public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { peristenceAdapter.writeCommand(subscriptionInfo, false); topicReferenceStore.addSubsciption(subscriptionInfo, retroactive); } /** */ public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId, final MessageAck originalAck) throws IOException { final boolean debug = LOG.isDebugEnabled(); JournalTopicAck ack = new JournalTopicAck(); ack.setDestination(destination); ack.setMessageId(messageId); ack.setMessageSequenceId(messageId.getBrokerSequenceId()); ack.setSubscritionName(subscriptionName); ack.setClientId(clientId); ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null); final Location location = peristenceAdapter.writeCommand(ack, false); final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); if (!context.isInTransaction()) { if (debug) { LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); } acknowledge(context,messageId, location, clientId,subscriptionName); } else { if (debug) { LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); } lock.lock(); try { inFlightTxLocations.add(location); }finally { lock.unlock(); } transactionStore.acknowledge(this, ack, location); context.getTransaction().addSynchronization(new Synchronization() { public void afterCommit() throws Exception { if (debug) { LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); } lock.lock(); try { inFlightTxLocations.remove(location); acknowledge(context,messageId, location, clientId,subscriptionName); }finally { lock.unlock(); } } public void afterRollback() throws Exception { if (debug) { LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); } lock.lock(); try{ inFlightTxLocations.remove(location); }finally { lock.unlock(); } } }); } } public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { try { SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName); if (sub != null) { topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null); return true; } } catch (Throwable e) { LOG.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); } return false; } /** * @param messageId * @param location * @param key * @throws IOException */ protected void acknowledge(final ConnectionContext context, MessageId messageId, Location location, String clientId, String subscriptionName) throws IOException { MessageAck ack = null; lock.lock(); try { lastLocation = location; }finally { lock.unlock(); } if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)) { ack = new MessageAck(); ack.setLastMessageId(messageId); } if (ack != null) { removeMessage(context, ack); } } /** * @return Returns the longTermStore. */ public TopicReferenceStore getTopicReferenceStore() { return topicReferenceStore; } public void deleteSubscription(String clientId, String subscriptionName) throws IOException { topicReferenceStore.deleteSubscription(clientId, subscriptionName); } public SubscriptionInfo[] getAllSubscriptions() throws IOException { return topicReferenceStore.getAllSubscriptions(); } public int getMessageCount(String clientId, String subscriberName) throws IOException { flush(); SubscriptionInfo info = lookupSubscription(clientId, subscriberName); try { MessageCounter counter = new MessageCounter(info, this); topicReferenceStore.recoverSubscription(clientId, subscriberName, counter); return counter.count; } catch (Exception e) { throw IOExceptionSupport.create(e); } } private class MessageCounter implements MessageRecoveryListener { int count = 0; SubscriptionInfo info; BooleanExpression selectorExpression; TopicMessageStore store; public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception { this.info = info; if (info != null) { String selector = info.getSelector(); if (selector != null) { this.selectorExpression = SelectorParser.parse(selector); } } this.store = store; } public boolean recoverMessageReference(MessageId ref) throws Exception { if (selectorExpression != null) { MessageEvaluationContext ctx = new MessageEvaluationContext(); ctx.setMessageReference(store.getMessage(ref)); if (selectorExpression.matches(ctx)) { count++; } } else { count ++; } return true; } public boolean recoverMessage(Message message) throws Exception { if (selectorExpression != null) { MessageEvaluationContext ctx = new MessageEvaluationContext(); ctx.setMessageReference(store.getMessage(message.getMessageId())); if (selectorExpression.matches(ctx)) { count++; } } else { count++; } return true; } public boolean isDuplicate(MessageId ref) { return false; } public boolean hasSpace() { return true; } } public void resetBatching(String clientId, String subscriptionName) { topicReferenceStore.resetBatching(clientId, subscriptionName); } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ AMQTopicMessageStore.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.