|
ActiveMQ example source code file (ManagedRegionBroker.java)
This example ActiveMQ source code file (ManagedRegionBroker.java) is included in the DevDaily.com
"Java Source Code
Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.
The ActiveMQ ManagedRegionBroker.java source code
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ManagedRegionBroker extends RegionBroker {
private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
private final ManagementContext managementContext;
private final ObjectName brokerObjectName;
private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap();
private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap();
private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap();
private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap();
private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap();
private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap();
private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap();
private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap();
private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap();
private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap();
private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
this.managementContext = context;
this.brokerObjectName = brokerObjectName;
}
@Override
public void start() throws Exception {
super.start();
// build all existing durable subscriptions
buildExistingSubscriptions();
}
@Override
protected void doStop(ServiceStopper stopper) {
super.doStop(stopper);
// lets remove any mbeans not yet removed
for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
ObjectName name = iter.next();
try {
managementContext.unregisterMBean(name);
} catch (InstanceNotFoundException e) {
LOG.warn("The MBean: " + name + " is no longer registered with JMX");
} catch (Exception e) {
stopper.onException(this, e);
}
}
registeredMBeans.clear();
}
@Override
protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
public void register(ActiveMQDestination destName, Destination destination) {
// TODO refactor to allow views for custom destinations
try {
ObjectName objectName = createObjectName(destName);
DestinationView view;
if (destination instanceof Queue) {
view = new QueueView(this, (Queue)destination);
} else if (destination instanceof Topic) {
view = new TopicView(this, (Topic)destination);
} else {
view = null;
LOG.warn("JMX View is not supported for custom destination: " + destination);
}
if (view != null) {
registerDestination(objectName, destName, view);
}
} catch (Exception e) {
LOG.error("Failed to register destination " + destName, e);
}
}
public void unregister(ActiveMQDestination destName) {
try {
ObjectName objectName = createObjectName(destName);
unregisterDestination(objectName);
} catch (Exception e) {
LOG.error("Failed to unregister " + destName, e);
}
}
public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
SubscriptionView view;
if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
// add offline subscribers to inactive list
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(context.getClientId());
info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
info.setDestination(sub.getConsumerInfo().getDestination());
addInactiveSubscription(key, info);
} else {
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), sub);
} else {
if (sub instanceof TopicSubscription) {
view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
} else {
view = new SubscriptionView(context.getClientId(), sub);
}
}
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
subscriptionMap.put(sub, objectName);
return objectName;
} catch (Exception e) {
LOG.error("Failed to register subscription " + sub, e);
return null;
}
}
public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
Hashtable map = brokerJmxObjectName.getKeyPropertyList();
String brokerDomain = brokerJmxObjectName.getDomain();
String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
String persistentMode = "persistentMode=";
String consumerId = "";
if (sub.getConsumerInfo().isDurable()) {
persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
} else {
persistentMode += "Non-Durable";
if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
}
}
objectNameStr += persistentMode + ",";
objectNameStr += destinationType + ",";
objectNameStr += destinationName + ",";
objectNameStr += clientId;
objectNameStr += consumerId;
return objectNameStr;
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription sub = super.addConsumer(context, info);
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
// if it was inactive, register it
registerSubscription(context, sub);
}
return sub;
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
for (Subscription sub : subscriptionMap.keySet()) {
if (sub.getConsumerInfo().equals(info)) {
// unregister all consumer subs
unregisterSubscription(subscriptionMap.get(sub), true);
}
}
super.removeConsumer(context, info);
}
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
try {
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
managementContext.unregisterMBean(inactiveName);
}
} catch (Exception e) {
LOG.error("Failed to unregister subscription " + sub, e);
}
}
}
protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
if (dest.isQueue()) {
if (dest.isTemporary()) {
temporaryQueues.put(key, view);
} else {
queues.put(key, view);
}
} else {
if (dest.isTemporary()) {
temporaryTopics.put(key, view);
} else {
topics.put(key, view);
}
}
try {
AnnotatedMBean.registerMBean(managementContext, view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
protected void unregisterDestination(ObjectName key) throws Exception {
DestinationView view = null;
removeAndRemember(topics, key, view);
removeAndRemember(queues, key, view);
removeAndRemember(temporaryQueues, key, view);
removeAndRemember(temporaryTopics, key, view);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
if (view != null) {
key = view.getSlowConsumerStrategy();
if (key!= null && registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
}
}
private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
DestinationView candidate = map.remove(key);
if (candidate != null && view == null) {
view = candidate;
}
}
protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) {
if (dest.isTemporary()) {
temporaryQueueSubscribers.put(key, view);
} else {
queueSubscribers.put(key, view);
}
} else {
if (dest.isTemporary()) {
temporaryTopicSubscribers.put(key, view);
} else {
if (info.isDurable()) {
durableTopicSubscribers.put(key, view);
// unregister any inactive durable subs
try {
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName);
managementContext.unregisterMBean(inactiveName);
}
} catch (Throwable e) {
LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
}
} else {
topicSubscribers.put(key, view);
}
}
}
try {
AnnotatedMBean.registerMBean(managementContext, view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
queueSubscribers.remove(key);
topicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
if (view != null) {
// need to put this back in the inactive list
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
if (addToInactive) {
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
}
}
}
protected void buildExistingSubscriptions() throws Exception {
Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap();
Set destinations = destinationFactory.getDestinations();
if (destinations != null) {
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination)iter.next();
if (dest.isTopic()) {
SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
if (infos != null) {
for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i];
SubscriptionKey key = new SubscriptionKey(info);
if (!alreadyKnown(key)) {
LOG.debug("Restoring durable subscription mbean: " + info);
subscriptions.put(key, info);
}
}
}
}
}
}
for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Entry)i.next();
SubscriptionKey key = (SubscriptionKey)entry.getKey();
SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
addInactiveSubscription(key, info);
}
}
private boolean alreadyKnown(SubscriptionKey key) {
boolean known = false;
known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
if (LOG.isTraceEnabled()) {
LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
}
return known;
}
protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
Hashtable map = brokerObjectName.getKeyPropertyList();
try {
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
+ "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
try {
AnnotatedMBean.registerMBean(managementContext, view, objectName);
registeredMBeans.add(objectName);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
inactiveDurableTopicSubscribers.put(objectName, view);
subscriptionKeys.put(key, objectName);
} catch (Exception e) {
LOG.error("Failed to register subscription " + info, e);
}
}
public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
List<Message> messages = getSubscriberMessages(view);
CompositeData c[] = new CompositeData[messages.size()];
for (int i = 0; i < c.length; i++) {
try {
c[i] = OpenTypeSupport.convert(messages.get(i));
} catch (Throwable e) {
LOG.error("failed to browse : " + view, e);
}
}
return c;
}
public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
List<Message> messages = getSubscriberMessages(view);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
TabularDataSupport rc = new TabularDataSupport(tt);
for (int i = 0; i < messages.size(); i++) {
rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
}
return rc;
}
protected List<Message> getSubscriberMessages(SubscriptionView view) {
// TODO It is very dangerous operation for big backlogs
if (!(destinationFactory instanceof DestinationFactoryImpl)) {
throw new RuntimeException("unsupported by " + destinationFactory);
}
PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
final List<Message> result = new ArrayList();
try {
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store = adapter.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception {
result.add(message);
return true;
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
public boolean hasSpace() {
return true;
}
public boolean isDuplicate(MessageId id) {
return false;
}
});
} catch (Throwable e) {
LOG.error("Failed to browse messages for Subscription " + view, e);
}
return result;
}
protected ObjectName[] getTopics() {
Set<ObjectName> set = topics.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueues() {
Set<ObjectName> set = queues.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopics() {
Set<ObjectName> set = temporaryTopics.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueues() {
Set<ObjectName> set = temporaryQueues.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTopicSubscribers() {
Set<ObjectName> set = topicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getDurableTopicSubscribers() {
Set<ObjectName> set = durableTopicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueueSubscribers() {
Set<ObjectName> set = queueSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopicSubscribers() {
Set<ObjectName> set = temporaryTopicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueueSubscribers() {
Set<ObjectName> set = temporaryQueueSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getInactiveDurableTopicSubscribers() {
Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
public Broker getContextBroker() {
return contextBroker;
}
public void setContextBroker(Broker contextBroker) {
this.contextBroker = contextBroker;
}
protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
// Build the object name for the destination
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
+ JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
+ JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
return objectName;
}
public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
ObjectName objectName = null;
try {
objectName = createObjectName(strategy);
if (!registeredMBeans.contains(objectName)) {
AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
AnnotatedMBean.registerMBean(managementContext, view, objectName);
registeredMBeans.add(objectName);
}
} catch (Exception e) {
LOG.warn("Failed to register MBean: " + strategy);
LOG.debug("Failure reason: " + e, e);
}
return objectName;
}
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
+ "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
return objectName;
}
public ObjectName getSubscriberObjectName(Subscription key) {
return subscriptionMap.get(key);
}
public Subscription getSubscriber(ObjectName key) {
Subscription sub = null;
for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
if (entry.getValue().equals(key)) {
sub = entry.getKey();
break;
}
}
return sub;
}
}
Other ActiveMQ examples (source code examples)
Here is a short list of links related to this ActiveMQ ManagedRegionBroker.java source code file:
|