alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (ForwardingBridge.java)

This example ActiveMQ source code file (ForwardingBridge.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

atomiclong, brokerinfo, command, command, consumerinfo, consumerinfo, exception, io, ioexception, ioexception, messageack, string, string, transport, unexpected

The ActiveMQ ForwardingBridge.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Forwards all messages from the local broker to the remote broker.
 * 
 * @org.apache.xbean.XBean
 * 
 * 
 */
public class ForwardingBridge implements Service {

    private static final IdGenerator ID_GENERATOR = new IdGenerator();
    private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);

    final AtomicLong enqueueCounter = new AtomicLong();
    final AtomicLong dequeueCounter = new AtomicLong();
    ConnectionInfo connectionInfo;
    SessionInfo sessionInfo;
    ProducerInfo producerInfo;
    ConsumerInfo queueConsumerInfo;
    ConsumerInfo topicConsumerInfo;
    BrokerId localBrokerId;
    BrokerId remoteBrokerId;
    BrokerInfo localBrokerInfo;
    BrokerInfo remoteBrokerInfo;

    private final Transport localBroker;
    private final Transport remoteBroker;
    private String clientId;
    private int prefetchSize = 1000;
    private boolean dispatchAsync;
    private String destinationFilter = ">";
    private NetworkBridgeListener bridgeFailedListener;

    public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
        this.localBroker = localBroker;
        this.remoteBroker = remoteBroker;
    }

    public void start() throws Exception {
        LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker
                 + " has been established.");

        localBroker.setTransportListener(new DefaultTransportListener() {
            public void onCommand(Object o) {
                Command command = (Command)o;
                serviceLocalCommand(command);
            }

            public void onException(IOException error) {
                serviceLocalException(error);
            }
        });

        remoteBroker.setTransportListener(new DefaultTransportListener() {
            public void onCommand(Object o) {
                Command command = (Command)o;
                serviceRemoteCommand(command);
            }

            public void onException(IOException error) {
                serviceRemoteException(error);
            }
        });

        localBroker.start();
        remoteBroker.start();
    }

    protected void triggerStartBridge() throws IOException {
        Thread thead = new Thread() {
            public void run() {
                try {
                    startBridge();
                } catch (IOException e) {
                    LOG.error("Failed to start network bridge: " + e, e);
                }
            }
        };
        thead.start();
    }

    /**
     * @throws IOException
     */
    final void startBridge() throws IOException {
        connectionInfo = new ConnectionInfo();
        connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
        connectionInfo.setClientId(clientId);
        localBroker.oneway(connectionInfo);
        remoteBroker.oneway(connectionInfo);

        sessionInfo = new SessionInfo(connectionInfo, 1);
        localBroker.oneway(sessionInfo);
        remoteBroker.oneway(sessionInfo);

        queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
        queueConsumerInfo.setDispatchAsync(dispatchAsync);
        queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
        queueConsumerInfo.setPrefetchSize(prefetchSize);
        queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
        localBroker.oneway(queueConsumerInfo);

        producerInfo = new ProducerInfo(sessionInfo, 1);
        producerInfo.setResponseRequired(false);
        remoteBroker.oneway(producerInfo);

        if (connectionInfo.getClientId() != null) {
            topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
            topicConsumerInfo.setDispatchAsync(dispatchAsync);
            topicConsumerInfo.setSubscriptionName("topic-bridge");
            topicConsumerInfo.setRetroactive(true);
            topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
            topicConsumerInfo.setPrefetchSize(prefetchSize);
            topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
            localBroker.oneway(topicConsumerInfo);
        }
        LOG.info("Network connection between " + localBroker + " and " + remoteBroker
                 + " has been established.");
    }

    public void stop() throws Exception {
        try {
            if (connectionInfo != null) {
                localBroker.request(connectionInfo.createRemoveCommand());
                remoteBroker.request(connectionInfo.createRemoveCommand());
            }
            localBroker.setTransportListener(null);
            remoteBroker.setTransportListener(null);
            localBroker.oneway(new ShutdownInfo());
            remoteBroker.oneway(new ShutdownInfo());
        } finally {
            ServiceStopper ss = new ServiceStopper();
            ss.stop(localBroker);
            ss.stop(remoteBroker);
            ss.throwFirstException();
        }
    }

    public void serviceRemoteException(Throwable error) {
        LOG.info("Unexpected remote exception: " + error);
        LOG.debug("Exception trace: ", error);
    }

    protected void serviceRemoteCommand(Command command) {
        try {
            if (command.isBrokerInfo()) {
                synchronized (this) {
                    remoteBrokerInfo = (BrokerInfo)command;
                    remoteBrokerId = remoteBrokerInfo.getBrokerId();
                    if (localBrokerId != null) {
                        if (localBrokerId.equals(remoteBrokerId)) {
                            LOG.info("Disconnecting loop back connection.");
                            ServiceSupport.dispose(this);
                        } else {
                            triggerStartBridge();
                        }
                    }
                }
            } else {
                LOG.warn("Unexpected remote command: " + command);
            }
        } catch (IOException e) {
            serviceLocalException(e);
        }
    }

    public void serviceLocalException(Throwable error) {
        LOG.info("Unexpected local exception: " + error);
        LOG.debug("Exception trace: ", error);
        fireBridgeFailed();
    }

    protected void serviceLocalCommand(Command command) {
        try {
            if (command.isMessageDispatch()) {

                enqueueCounter.incrementAndGet();

                final MessageDispatch md = (MessageDispatch)command;
                Message message = md.getMessage();
                message.setProducerId(producerInfo.getProducerId());

                if (message.getOriginalTransactionId() == null) {
                    message.setOriginalTransactionId(message.getTransactionId());
                }
                message.setTransactionId(null);

                if (!message.isResponseRequired()) {
                    // If the message was originally sent using async send, we
                    // will preserve that QOS
                    // by bridging it using an async send (small chance of
                    // message loss).
                    remoteBroker.oneway(message);
                    dequeueCounter.incrementAndGet();
                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));

                } else {

                    // The message was not sent using async send, so we should
                    // only ack the local
                    // broker when we get confirmation that the remote broker
                    // has received the message.
                    ResponseCallback callback = new ResponseCallback() {
                        public void onCompletion(FutureResponse future) {
                            try {
                                Response response = future.getResult();
                                if (response.isException()) {
                                    ExceptionResponse er = (ExceptionResponse)response;
                                    serviceLocalException(er.getException());
                                } else {
                                    dequeueCounter.incrementAndGet();
                                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
                                }
                            } catch (IOException e) {
                                serviceLocalException(e);
                            }
                        }
                    };

                    remoteBroker.asyncRequest(message, callback);
                }

                // Ack on every message since we don't know if the broker is
                // blocked due to memory
                // usage and is waiting for an Ack to un-block him.

                // Acking a range is more efficient, but also more prone to
                // locking up a server
                // Perhaps doing something like the following should be policy
                // based.
                // if(
                // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
                // ) {
                // queueDispatched++;
                // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
                // ) {
                // localBroker.oneway(new MessageAck(md,
                // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
                // queueDispatched=0;
                // }
                // } else {
                // topicDispatched++;
                // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
                // ) {
                // localBroker.oneway(new MessageAck(md,
                // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
                // topicDispatched=0;
                // }
                // }
            } else if (command.isBrokerInfo()) {
                synchronized (this) {
                    localBrokerInfo = (BrokerInfo)command;
                    localBrokerId = localBrokerInfo.getBrokerId();
                    if (remoteBrokerId != null) {
                        if (remoteBrokerId.equals(localBrokerId)) {
                            LOG.info("Disconnecting loop back connection.");
                            ServiceSupport.dispose(this);
                        } else {
                            triggerStartBridge();
                        }
                    }
                }
            } else {
                LOG.debug("Unexpected local command: " + command);
            }
        } catch (IOException e) {
            serviceLocalException(e);
        }
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public int getPrefetchSize() {
        return prefetchSize;
    }

    public void setPrefetchSize(int prefetchSize) {
        this.prefetchSize = prefetchSize;
    }

    public boolean isDispatchAsync() {
        return dispatchAsync;
    }

    public void setDispatchAsync(boolean dispatchAsync) {
        this.dispatchAsync = dispatchAsync;
    }

    public String getDestinationFilter() {
        return destinationFilter;
    }

    public void setDestinationFilter(String destinationFilter) {
        this.destinationFilter = destinationFilter;
    }

    public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
        this.bridgeFailedListener = listener;
    }

    private void fireBridgeFailed() {
        NetworkBridgeListener l = this.bridgeFailedListener;
        if (l != null) {
            l.bridgeFailed();
        }
    }

    public String getRemoteAddress() {
        return remoteBroker.getRemoteAddress();
    }

    public String getLocalAddress() {
        return localBroker.getRemoteAddress();
    }

    public String getLocalBrokerName() {
        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
    }

    public String getRemoteBrokerName() {
        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
    }

    public long getDequeueCounter() {
        return dequeueCounter.get();
    }

    public long getEnqueueCounter() {
        return enqueueCounter.get();
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ ForwardingBridge.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.