alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (RegionBroker.java)

This example ActiveMQ source code file (RegionBroker.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqdestination, activemqdestination, brokerinfo, connectioncontext, destination, destinationfactory, exception, exception, io, map, net, network, override, override, region, region, string, threading, threads, util

The ActiveMQ RegionBroker.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Routes Broker operations to the correct messaging regions for processing.
 * 
 * 
 */
public class RegionBroker extends EmptyBroker {
    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();

    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    protected DestinationFactory destinationFactory;
    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap());

    private final Region queueRegion;
    private final Region topicRegion;
    private final Region tempQueueRegion;
    private final Region tempTopicRegion;
    protected final BrokerService brokerService;
    private boolean started;
    private boolean keepDurableSubsActive;

    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList();
    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap();
    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap();

    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
    private BrokerId brokerId;
    private String brokerName;
    private final Map<String, ConnectionContext> clientIdSet = new HashMap();
    private final DestinationInterceptor destinationInterceptor;
    private ConnectionContext adminConnectionContext;
    private final Scheduler scheduler;
    private final ThreadPoolExecutor executor;
    
    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
        public void run() {
            purgeInactiveDestinations();
        }
    };

    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
                        DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
        this.brokerService = brokerService;
        this.executor=executor;
        this.scheduler = scheduler;
        if (destinationFactory == null) {
            throw new IllegalArgumentException("null destinationFactory");
        }
        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
        this.destinationFactory = destinationFactory;
        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
        this.destinationInterceptor = destinationInterceptor;
        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
    }

    @Override
    public Map<ActiveMQDestination, Destination> getDestinationMap() {
        Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
        answer.putAll(getTopicRegion().getDestinationMap());
        return answer;
    }

    @Override
    public Set <Destination> getDestinations(ActiveMQDestination destination) {
        switch (destination.getDestinationType()) {
        case ActiveMQDestination.QUEUE_TYPE:
            return queueRegion.getDestinations(destination);
        case ActiveMQDestination.TOPIC_TYPE:
            return topicRegion.getDestinations(destination);
        case ActiveMQDestination.TEMP_QUEUE_TYPE:
            return tempQueueRegion.getDestinations(destination);
        case ActiveMQDestination.TEMP_TOPIC_TYPE:
            return tempTopicRegion.getDestinations(destination);
        default:
            return Collections.emptySet();
        }
    }

    @Override
    public Broker getAdaptor(Class type) {
        if (type.isInstance(this)) {
            return this;
        }
        return null;
    }

    public Region getQueueRegion() {
        return queueRegion;
    }

    public Region getTempQueueRegion() {
        return tempQueueRegion;
    }

    public Region getTempTopicRegion() {
        return tempTopicRegion;
    }

    public Region getTopicRegion() {
        return topicRegion;
    }

    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
    }

    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
    }

    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
    }

    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
    }

    @Override
    public void start() throws Exception {
        ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
        started = true;
        queueRegion.start();
        topicRegion.start();
        tempQueueRegion.start();
        tempTopicRegion.start();
        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
        if (period > 0) {
            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
        }
    }

    @Override
    public void stop() throws Exception {
        started = false;
        this.scheduler.cancel(purgeInactiveDestinationsTask);
        ServiceStopper ss = new ServiceStopper();
        doStop(ss);
        ss.throwFirstException();
        // clear the state
        clientIdSet.clear();
        connections.clear();
        destinations.clear();
        brokerInfos.clear();
    }

    public PolicyMap getDestinationPolicy() {
        return brokerService != null ? brokerService.getDestinationPolicy() : null;
    }

    @Override
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        String clientId = info.getClientId();
        if (clientId == null) {
            throw new InvalidClientIDException("No clientID specified for connection request");
        }
        synchronized (clientIdSet) {
            ConnectionContext oldContext = clientIdSet.get(clientId);
            if (oldContext != null) {
            	if (context.isFaultTolerant() || context.isNetworkConnection()){
            		//remove the old connection
            		try{
            			removeConnection(oldContext, info, new Exception("remove stale client"));
            		}catch(Exception e){
            			LOG.warn("Failed to remove stale connection ",e);
            		}
            	}else{
                throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
                                                   + oldContext.getConnection().getRemoteAddress());
            	}
            } else {
                clientIdSet.put(clientId, context);
            }
        }

        connections.add(context.getConnection());
    }

    @Override
    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
        String clientId = info.getClientId();
        if (clientId == null) {
            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
        }
        synchronized (clientIdSet) {
            ConnectionContext oldValue = clientIdSet.get(clientId);
            // we may be removing the duplicate connection, not the first
            // connection to be created
            // so lets check that their connection IDs are the same
            if (oldValue == context) {
                if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
                    clientIdSet.remove(clientId);
                }
            }
        }
        connections.remove(context.getConnection());
    }

    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
    }

    @Override
    public Connection[] getClients() throws Exception {
        ArrayList<Connection> l = new ArrayList(connections);
        Connection rc[] = new Connection[l.size()];
        l.toArray(rc);
        return rc;
    }

    @Override
    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {

        Destination answer;

        answer = destinations.get(destination);
        if (answer != null) {
            return answer;
        }

        switch (destination.getDestinationType()) {
        case ActiveMQDestination.QUEUE_TYPE:
            answer = queueRegion.addDestination(context, destination,true);
            break;
        case ActiveMQDestination.TOPIC_TYPE:
            answer = topicRegion.addDestination(context, destination,true);
            break;
        case ActiveMQDestination.TEMP_QUEUE_TYPE:
            answer = tempQueueRegion.addDestination(context, destination,create);
            break;
        case ActiveMQDestination.TEMP_TOPIC_TYPE:
            answer = tempTopicRegion.addDestination(context, destination,create);
            break;
        default:
            throw createUnknownDestinationTypeException(destination);
        }

        destinations.put(destination, answer);
        return answer;

    }

    @Override
    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

        if (destinations.containsKey(destination)) {
            switch (destination.getDestinationType()) {
            case ActiveMQDestination.QUEUE_TYPE:
                queueRegion.removeDestination(context, destination, timeout);
                removeAdvisoryTopics("Queue.", context, destination, timeout);
                break;
            case ActiveMQDestination.TOPIC_TYPE:
                topicRegion.removeDestination(context, destination, timeout);
                removeAdvisoryTopics("Topic.", context, destination, timeout);
                break;
            case ActiveMQDestination.TEMP_QUEUE_TYPE:
                tempQueueRegion.removeDestination(context, destination, timeout);
                break;
            case ActiveMQDestination.TEMP_TOPIC_TYPE:
                tempTopicRegion.removeDestination(context, destination, timeout);
                break;
            default:
                throw createUnknownDestinationTypeException(destination);
            }
            destinations.remove(destination);

        }

    }

    public void removeAdvisoryTopics(String destinationType, ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
        if (this.brokerService.isAdvisorySupport()) {
            String producerAdvisoryTopic = AdvisorySupport.PRODUCER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
            String consumerAdvisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();

            ActiveMQDestination dests[] = getDestinations();
            for (ActiveMQDestination dest: dests) {
                String name = dest.getPhysicalName();
                if ( name.equals(producerAdvisoryTopic) || name.equals(consumerAdvisoryTopic) ) {
                    try {
                        removeDestination(context, dest, timeout);
                    } catch (JMSException ignore) {
                        // at least ignore the Unknown Destination Type JMSException
                    }
                }
            }
        }
    }

    @Override
    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
        addDestination(context, info.getDestination(),true);

    }

    @Override
    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
        removeDestination(context, info.getDestination(), info.getTimeout());

    }

    @Override
    public ActiveMQDestination[] getDestinations() throws Exception {
        ArrayList<ActiveMQDestination> l;

        l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());

        ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
        l.toArray(rc);
        return rc;
    }

    @Override
    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
        ActiveMQDestination destination = info.getDestination();
        synchronized (purgeInactiveDestinationsTask) {
            if (destination != null) {

                // This seems to cause the destination to be added but without
                // advisories firing...
                context.getBroker().addDestination(context, destination, false);
                switch (destination.getDestinationType()) {
                case ActiveMQDestination.QUEUE_TYPE:
                    queueRegion.addProducer(context, info);
                    break;
                case ActiveMQDestination.TOPIC_TYPE:
                    topicRegion.addProducer(context, info);
                    break;
                case ActiveMQDestination.TEMP_QUEUE_TYPE:
                    tempQueueRegion.addProducer(context, info);
                    break;
                case ActiveMQDestination.TEMP_TOPIC_TYPE:
                    tempTopicRegion.addProducer(context, info);
                    break;
                }
            }
        }
    }

    @Override
    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
        ActiveMQDestination destination = info.getDestination();
        synchronized (purgeInactiveDestinationsTask) {
            if (destination != null) {
                switch (destination.getDestinationType()) {
                case ActiveMQDestination.QUEUE_TYPE:
                    queueRegion.removeProducer(context, info);
                    break;
                case ActiveMQDestination.TOPIC_TYPE:
                    topicRegion.removeProducer(context, info);
                    break;
                case ActiveMQDestination.TEMP_QUEUE_TYPE:
                    tempQueueRegion.removeProducer(context, info);
                    break;
                case ActiveMQDestination.TEMP_TOPIC_TYPE:
                    tempTopicRegion.removeProducer(context, info);
                    break;
                }
            }
        }
    }

    @Override
    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
        ActiveMQDestination destination = info.getDestination();
        if (destinationInterceptor != null) {
            destinationInterceptor.create(this, context, destination);
        }
        synchronized (purgeInactiveDestinationsTask) {
            switch (destination.getDestinationType()) {
            case ActiveMQDestination.QUEUE_TYPE:
                return queueRegion.addConsumer(context, info);

            case ActiveMQDestination.TOPIC_TYPE:
                return topicRegion.addConsumer(context, info);

            case ActiveMQDestination.TEMP_QUEUE_TYPE:
                return tempQueueRegion.addConsumer(context, info);

            case ActiveMQDestination.TEMP_TOPIC_TYPE:
                return tempTopicRegion.addConsumer(context, info);

            default:
                throw createUnknownDestinationTypeException(destination);
            }
        }
    }

    @Override
    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
        ActiveMQDestination destination = info.getDestination();
        synchronized (purgeInactiveDestinationsTask) {
            switch (destination.getDestinationType()) {

            case ActiveMQDestination.QUEUE_TYPE:
                queueRegion.removeConsumer(context, info);
                break;
            case ActiveMQDestination.TOPIC_TYPE:
                topicRegion.removeConsumer(context, info);
                break;
            case ActiveMQDestination.TEMP_QUEUE_TYPE:
                tempQueueRegion.removeConsumer(context, info);
                break;
            case ActiveMQDestination.TEMP_TOPIC_TYPE:
                tempTopicRegion.removeConsumer(context, info);
                break;
            default:
                throw createUnknownDestinationTypeException(destination);
            }
        }
    }

    @Override
    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
        synchronized (purgeInactiveDestinationsTask) {
            topicRegion.removeSubscription(context, info);
        }
    }

    @Override
    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
        message.setBrokerInTime(System.currentTimeMillis());
        if (producerExchange.isMutable() || producerExchange.getRegion() == null
                || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
            ActiveMQDestination destination = message.getDestination();
            // ensure the destination is registered with the RegionBroker
            producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
            Region region;
            switch (destination.getDestinationType()) {
            case ActiveMQDestination.QUEUE_TYPE:
                region = queueRegion;
                break;
            case ActiveMQDestination.TOPIC_TYPE:
                region = topicRegion;
                break;
            case ActiveMQDestination.TEMP_QUEUE_TYPE:
                region = tempQueueRegion;
                break;
            case ActiveMQDestination.TEMP_TOPIC_TYPE:
                region = tempTopicRegion;
                break;
            default:
                throw createUnknownDestinationTypeException(destination);
            }
            producerExchange.setRegion(region);
            producerExchange.setRegionDestination(null);
        }
        producerExchange.getRegion().send(producerExchange, message);
    }

    @Override
    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
        if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
            ActiveMQDestination destination = ack.getDestination();
            Region region;
            switch (destination.getDestinationType()) {
            case ActiveMQDestination.QUEUE_TYPE:
                region = queueRegion;
                break;
            case ActiveMQDestination.TOPIC_TYPE:
                region = topicRegion;
                break;
            case ActiveMQDestination.TEMP_QUEUE_TYPE:
                region = tempQueueRegion;
                break;
            case ActiveMQDestination.TEMP_TOPIC_TYPE:
                region = tempTopicRegion;
                break;
            default:
                throw createUnknownDestinationTypeException(destination);
            }
            consumerExchange.setRegion(region);
        }
        consumerExchange.getRegion().acknowledge(consumerExchange, ack);
    }

    @Override
    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
        ActiveMQDestination destination = pull.getDestination();
        switch (destination.getDestinationType()) {
        case ActiveMQDestination.QUEUE_TYPE:
            return queueRegion.messagePull(context, pull);

        case ActiveMQDestination.TOPIC_TYPE:
            return topicRegion.messagePull(context, pull);

        case ActiveMQDestination.TEMP_QUEUE_TYPE:
            return tempQueueRegion.messagePull(context, pull);

        case ActiveMQDestination.TEMP_TOPIC_TYPE:
            return tempTopicRegion.messagePull(context, pull);
        default:
            throw createUnknownDestinationTypeException(destination);
        }
    }

    @Override
    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override
    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override
    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override
    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override
    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override
    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override
    public void gc() {
        queueRegion.gc();
        topicRegion.gc();
    }

    @Override
    public BrokerId getBrokerId() {
        if (brokerId == null) {
            brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
        }
        return brokerId;
    }

    public void setBrokerId(BrokerId brokerId) {
        this.brokerId = brokerId;
    }

    @Override
    public String getBrokerName() {
        if (brokerName == null) {
            try {
                brokerName = InetAddressUtil.getLocalHostName().toLowerCase();
            } catch (Exception e) {
                brokerName = "localhost";
            }
        }
        return brokerName;
    }

    public void setBrokerName(String brokerName) {
        this.brokerName = brokerName;
    }

    public DestinationStatistics getDestinationStatistics() {
        return destinationStatistics;
    }

    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
        return new JMSException("Unknown destination type: " + destination.getDestinationType());
    }

    @Override
    public synchronized void addBroker(Connection connection, BrokerInfo info) {
        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
        if (existing == null) {
            existing = info.copy();
            existing.setPeerBrokerInfos(null);
            brokerInfos.put(info.getBrokerId(), existing);
        }
        existing.incrementRefCount();
        LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
        addBrokerInClusterUpdate();
    }

    @Override
    public synchronized void removeBroker(Connection connection, BrokerInfo info) {
        if (info != null) {
            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
            if (existing != null && existing.decrementRefCount() == 0) {
               brokerInfos.remove(info.getBrokerId());
            }
            LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
            removeBrokerInClusterUpdate();
        }
    }

    @Override
    public synchronized BrokerInfo[] getPeerBrokerInfos() {
        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
        result = brokerInfos.values().toArray(result);
        return result;
    }

    @Override
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        Message message = messageDispatch.getMessage();
        if (message != null) {
            long endTime = System.currentTimeMillis();
            message.setBrokerOutTime(endTime);
            if (getBrokerService().isEnableStatistics()) {
                long totalTime = endTime - message.getBrokerInTime();
                message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
            }
        }
    }

    @Override
    public void postProcessDispatch(MessageDispatch messageDispatch) {
    }

    @Override
    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        ActiveMQDestination destination = messageDispatchNotification.getDestination();
        switch (destination.getDestinationType()) {
        case ActiveMQDestination.QUEUE_TYPE:
            queueRegion.processDispatchNotification(messageDispatchNotification);
            break;
        case ActiveMQDestination.TOPIC_TYPE:
            topicRegion.processDispatchNotification(messageDispatchNotification);
            break;
        case ActiveMQDestination.TEMP_QUEUE_TYPE:
            tempQueueRegion.processDispatchNotification(messageDispatchNotification);
            break;
        case ActiveMQDestination.TEMP_TOPIC_TYPE:
            tempTopicRegion.processDispatchNotification(messageDispatchNotification);
            break;
        default:
            throw createUnknownDestinationTypeException(destination);
        }
    }

    public boolean isSlaveBroker() {
        return brokerService.isSlave();
    }

    @Override
    public boolean isStopped() {
        return !started;
    }

    @Override
    public Set<ActiveMQDestination> getDurableDestinations() {
        return destinationFactory.getDestinations();
    }

    protected void doStop(ServiceStopper ss) {
        ss.stop(queueRegion);
        ss.stop(topicRegion);
        ss.stop(tempQueueRegion);
        ss.stop(tempTopicRegion);
    }

    public boolean isKeepDurableSubsActive() {
        return keepDurableSubsActive;
    }

    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
        this.keepDurableSubsActive = keepDurableSubsActive;
    }

    public DestinationInterceptor getDestinationInterceptor() {
        return destinationInterceptor;
    }

    @Override
    public ConnectionContext getAdminConnectionContext() {
        return adminConnectionContext;
    }

    @Override
    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
        this.adminConnectionContext = adminConnectionContext;
    }

    public Map<ConnectionId, ConnectionState> getConnectionStates() {
        return connectionStates;
    }

    @Override
    public PListStore getTempDataStore() {
        return brokerService.getTempDataStore();
    }

    @Override
    public URI getVmConnectorURI() {
        return brokerService.getVmConnectorURI();
    }

    @Override
    public void brokerServiceStarted() {
    }

    @Override
    public BrokerService getBrokerService() {
        return brokerService;
    }

    @Override
    public boolean isExpired(MessageReference messageReference) {
        boolean expired = false;
        if (messageReference.isExpired()) {
            try {
                // prevent duplicate expiry processing
                Message message = messageReference.getMessage();
                synchronized (message) {
                    expired = stampAsExpired(message);
                }
            } catch (IOException e) {
                LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
            }
        }
        return expired;
    }
   
    private boolean stampAsExpired(Message message) throws IOException {
        boolean stamped=false;
        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
            long expiration=message.getExpiration();     
            message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
            stamped = true;
        }
        return stamped;
    }

    
    @Override
    public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Message expired " + node);
        }
        getRoot().sendToDeadLetterQueue(context, node, subscription);
    }
    
    @Override
    public void sendToDeadLetterQueue(ConnectionContext context,
	        MessageReference node, Subscription subscription){
		try{
			if(node!=null){
				Message message=node.getMessage();
				if(message!=null && node.getRegionDestination()!=null){
					DeadLetterStrategy deadLetterStrategy=node
					        .getRegionDestination().getDeadLetterStrategy();
					if(deadLetterStrategy!=null){
						if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
						    // message may be inflight to other subscriptions so do not modify
						    message = message.copy();
						    stampAsExpired(message);
						    message.setExpiration(0);
						    if(!message.isPersistent()){
							    message.setPersistent(true);
							    message.setProperty("originalDeliveryMode",
								        "NON_PERSISTENT");
							}
							// The original destination and transaction id do
							// not get filled when the message is first sent,
							// it is only populated if the message is routed to
							// another destination like the DLQ
							ActiveMQDestination deadLetterDestination=deadLetterStrategy
							        .getDeadLetterQueueFor(message, subscription);
							if (context.getBroker()==null) {
								context.setBroker(getRoot());
							}
							BrokerSupport.resendNoCopy(context,message,
							        deadLetterDestination);
						}
					} else {
					    if (LOG.isDebugEnabled()) {
					        LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
                                    + message.getMessageId() + ", destination: " + message.getDestination());
					    }
					}
				}
			}
		}catch(Exception e){
			LOG.warn("Caught an exception sending to DLQ: "+node,e);
		}
	}

    @Override
    public Broker getRoot() {
        try {
            return getBrokerService().getBroker();
        } catch (Exception e) {
            LOG.error("Trying to get Root Broker " + e);
            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
        }
    }
    
    /**
     * @return the broker sequence id
     */
    @Override
    public long getBrokerSequenceId() {
        synchronized(sequenceGenerator) {
            return sequenceGenerator.getNextSequenceId();
        }
    }
    
    
    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }
    
    public ThreadPoolExecutor getExecutor() {
        return this.executor;
    }
    
    @Override
    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
        ActiveMQDestination destination = control.getDestination();
        switch (destination.getDestinationType()) {
        case ActiveMQDestination.QUEUE_TYPE:
            queueRegion.processConsumerControl(consumerExchange, control);
            break;

        case ActiveMQDestination.TOPIC_TYPE:
            topicRegion.processConsumerControl(consumerExchange, control);
            break;
            
        case ActiveMQDestination.TEMP_QUEUE_TYPE:
            tempQueueRegion.processConsumerControl(consumerExchange, control);
            break;
            
        case ActiveMQDestination.TEMP_TOPIC_TYPE:
            tempTopicRegion.processConsumerControl(consumerExchange, control);
            break;
            
        default:
            LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
        }
    }
    
    protected void addBrokerInClusterUpdate() {
        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
        for (TransportConnector connector : connectors) {
            if (connector.isUpdateClusterClients()) {
                connector.updateClientClusterInfo();
            }
        }
    }

    protected void removeBrokerInClusterUpdate() {
        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
        for (TransportConnector connector : connectors) {
            if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
                connector.updateClientClusterInfo();
            }
        }
    }
    
    protected void purgeInactiveDestinations() {
        synchronized (purgeInactiveDestinationsTask) {
            List<BaseDestination> list = new ArrayList();
            Map<ActiveMQDestination, Destination> map = getDestinationMap();
            long timeStamp = System.currentTimeMillis();
            for (Destination d : map.values()) {
                if (d instanceof BaseDestination) {
                    BaseDestination bd = (BaseDestination) d;
                    bd.markForGC(timeStamp);
                    if (bd.canGC()) {
                        list.add(bd);
                    }
                }
            }

            if (list.isEmpty() == false) {

                ConnectionContext context = BrokerSupport.getConnectionContext(this);
                context.setBroker(this);

                for (BaseDestination dest : list) {
                    dest.getLog().info(
                            dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
                                    + " ms - removing ...");
                    try {
                        getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
                    } catch (Exception e) {
                        LOG.error("Failed to remove inactive destination " + dest, e);
                    }
                }
            }
        }
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ RegionBroker.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.