alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (FanoutTransport.java)

This example ActiveMQ source code file (FanoutTransport.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

command, exception, fanouttransporthandler, fanouttransporthandler, interruptedexception, io, ioexception, ioexception, iterator, iterator, net, network, object, requestcounter, requestcounter, threading, threads, transport, uri, util

The ActiveMQ FanoutTransport.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.transport.fanout;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A Transport that fans out a connection to multiple brokers.
 * 
 * 
 */
public class FanoutTransport implements CompositeTransport {

    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);

    private TransportListener transportListener;
    private boolean disposed;
    private boolean connected;

    private final Object reconnectMutex = new Object();
    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
    private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap();

    private final TaskRunner reconnectTask;
    private boolean started;

    private final ArrayList<FanoutTransportHandler> transports = new ArrayList();
    private int connectedCount;

    private int minAckCount = 2;

    private long initialReconnectDelay = 10;
    private long maxReconnectDelay = 1000 * 30;
    private long backOffMultiplier = 2;
    private final boolean useExponentialBackOff = true;
    private int maxReconnectAttempts;
    private Exception connectionFailure;
    private FanoutTransportHandler primary;
    private boolean fanOutQueues = false;

    static class RequestCounter {

        final Command command;
        final AtomicInteger ackCount;

        RequestCounter(Command command, int count) {
            this.command = command;
            this.ackCount = new AtomicInteger(count);
        }

        @Override
        public String toString() {
            return command.getCommandId() + "=" + ackCount.get();
        }
    }

    class FanoutTransportHandler extends DefaultTransportListener {

        private final URI uri;
        private Transport transport;

        private int connectFailures;
        private long reconnectDelay = initialReconnectDelay;
        private long reconnectDate;

        public FanoutTransportHandler(URI uri) {
            this.uri = uri;
        }

        @Override
        public void onCommand(Object o) {
            Command command = (Command)o;
            if (command.isResponse()) {
                Integer id = new Integer(((Response)command).getCorrelationId());
                RequestCounter rc = requestMap.get(id);
                if (rc != null) {
                    if (rc.ackCount.decrementAndGet() <= 0) {
                        requestMap.remove(id);
                        transportListenerOnCommand(command);
                    }
                } else {
                    transportListenerOnCommand(command);
                }
            } else {
                transportListenerOnCommand(command);
            }
        }

        @Override
        public void onException(IOException error) {
            try {
                synchronized (reconnectMutex) {
                    if (transport == null || !transport.isConnected()) {
                        return;
                    }

                    LOG.debug("Transport failed, starting up reconnect task", error);

                    ServiceSupport.dispose(transport);
                    transport = null;
                    connectedCount--;
                    if (primary == this) {
                        primary = null;
                    }
                    reconnectTask.wakeup();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (transportListener != null) {
                    transportListener.onException(new InterruptedIOException());
                }
            }
        }
    }

    public FanoutTransport() throws InterruptedIOException {
        // Setup a task that is used to reconnect the a connection async.
        reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
            public boolean iterate() {
                return doConnect();
            }
        }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
    }

    /**
     * @return
     */
    private boolean doConnect() {
        long closestReconnectDate = 0;
        synchronized (reconnectMutex) {

            if (disposed || connectionFailure != null) {
                reconnectMutex.notifyAll();
            }

            if (transports.size() == connectedCount || disposed || connectionFailure != null) {
                return false;
            } else {

                if (transports.isEmpty()) {
                    // connectionFailure = new IOException("No uris available to
                    // connect to.");
                } else {

                    // Try to connect them up.
                    Iterator<FanoutTransportHandler> iter = transports.iterator();
                    for (int i = 0; iter.hasNext() && !disposed; i++) {

                        long now = System.currentTimeMillis();

                        FanoutTransportHandler fanoutHandler = iter.next();
                        if (fanoutHandler.transport != null) {
                            continue;
                        }

                        // Are we waiting a little to try to reconnect this one?
                        if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
                            if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
                                closestReconnectDate = fanoutHandler.reconnectDate;
                            }
                            continue;
                        }

                        URI uri = fanoutHandler.uri;
                        try {
                            LOG.debug("Stopped: " + this);
                            LOG.debug("Attempting connect to: " + uri);
                            Transport t = TransportFactory.compositeConnect(uri);
                            fanoutHandler.transport = t;
                            t.setTransportListener(fanoutHandler);
                            if (started) {
                                restoreTransport(fanoutHandler);
                            }
                            LOG.debug("Connection established");
                            fanoutHandler.reconnectDelay = initialReconnectDelay;
                            fanoutHandler.connectFailures = 0;
                            if (primary == null) {
                                primary = fanoutHandler;
                            }
                            connectedCount++;
                        } catch (Exception e) {
                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);

                            if( fanoutHandler.transport !=null ) {
                                ServiceSupport.dispose(fanoutHandler.transport);
                                fanoutHandler.transport=null;
                            }
                            
                            if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
                                LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
                                connectionFailure = e;
                                reconnectMutex.notifyAll();
                                return false;
                            } else {

                                if (useExponentialBackOff) {
                                    // Exponential increment of reconnect delay.
                                    fanoutHandler.reconnectDelay *= backOffMultiplier;
                                    if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
                                        fanoutHandler.reconnectDelay = maxReconnectDelay;
                                    }
                                }

                                fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;

                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
                                    closestReconnectDate = fanoutHandler.reconnectDate;
                                }
                            }
                        }
                    }
                    if (transports.size() == connectedCount || disposed) {
                        reconnectMutex.notifyAll();
                        return false;
                    }

                }
            }

        }

        try {
            long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
            if (reconnectDelay > 0) {
                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
                Thread.sleep(reconnectDelay);
            }
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        }
        return true;
    }

    public void start() throws Exception {
        synchronized (reconnectMutex) {
            LOG.debug("Started.");
            if (started) {
                return;
            }
            started = true;
            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                FanoutTransportHandler th = iter.next();
                if (th.transport != null) {
                    restoreTransport(th);
                }
            }
            connected=true;
        }
    }

    public void stop() throws Exception {
        synchronized (reconnectMutex) {
            ServiceStopper ss = new ServiceStopper();

            if (!started) {
                return;
            }
            started = false;
            disposed = true;
            connected=false;

            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                FanoutTransportHandler th = iter.next();
                if (th.transport != null) {
                    ss.stop(th.transport);
                }
            }

            LOG.debug("Stopped: " + this);
            ss.throwFirstException();
        }
        reconnectTask.shutdown();
    }

	public int getMinAckCount() {
		return minAckCount;
	}

	public void setMinAckCount(int minAckCount) {
		this.minAckCount = minAckCount;
	}    
    
    public long getInitialReconnectDelay() {
        return initialReconnectDelay;
    }

    public void setInitialReconnectDelay(long initialReconnectDelay) {
        this.initialReconnectDelay = initialReconnectDelay;
    }

    public long getMaxReconnectDelay() {
        return maxReconnectDelay;
    }

    public void setMaxReconnectDelay(long maxReconnectDelay) {
        this.maxReconnectDelay = maxReconnectDelay;
    }

    public long getReconnectDelayExponent() {
        return backOffMultiplier;
    }

    public void setReconnectDelayExponent(long reconnectDelayExponent) {
        this.backOffMultiplier = reconnectDelayExponent;
    }

    public int getMaxReconnectAttempts() {
        return maxReconnectAttempts;
    }

    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
        this.maxReconnectAttempts = maxReconnectAttempts;
    }

    public void oneway(Object o) throws IOException {
        final Command command = (Command)o;
        try {
            synchronized (reconnectMutex) {

                // Wait for transport to be connected.
                while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
                    LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
                    reconnectMutex.wait(1000);
                }

                // Still not fully connected.
                if (connectedCount < minAckCount) {

                    Exception error;

                    // Throw the right kind of error..
                    if (disposed) {
                        error = new IOException("Transport disposed.");
                    } else if (connectionFailure != null) {
                        error = connectionFailure;
                    } else {
                        error = new IOException("Unexpected failure.");
                    }

                    if (error instanceof IOException) {
                        throw (IOException)error;
                    }
                    throw IOExceptionSupport.create(error);
                }

                // If it was a request and it was not being tracked by
                // the state tracker,
                // then hold it in the requestMap so that we can replay
                // it later.
                boolean fanout = isFanoutCommand(command);
                if (stateTracker.track(command) == null && command.isResponseRequired()) {
                    int size = fanout ? minAckCount : 1;
                    requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
                }
                
                // Send the message.
                if (fanout) {
                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                        FanoutTransportHandler th = iter.next();
                        if (th.transport != null) {
                            try {
                                th.transport.oneway(command);
                            } catch (IOException e) {
                                LOG.debug("Send attempt: failed.");
                                th.onException(e);
                            }
                        }
                    }
                } else {
                    try {
                        primary.transport.oneway(command);
                    } catch (IOException e) {
                        LOG.debug("Send attempt: failed.");
                        primary.onException(e);
                    }
                }

            }
        } catch (InterruptedException e) {
            // Some one may be trying to stop our thread.
            Thread.currentThread().interrupt();
            throw new InterruptedIOException();
        }
    }

    /**
     * @param command
     * @return
     */
    private boolean isFanoutCommand(Command command) {
        if (command.isMessage()) {
            if( fanOutQueues ) {
                return true;
            }
            return ((Message)command).getDestination().isTopic();
        }
        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
                command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
            return false;
        }
        return true;
    }

    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
        throw new AssertionError("Unsupported Method");
    }

    public Object request(Object command) throws IOException {
        throw new AssertionError("Unsupported Method");
    }

    public Object request(Object command, int timeout) throws IOException {
        throw new AssertionError("Unsupported Method");
    }

    public void reconnect() {
        LOG.debug("Waking up reconnect task");
        try {
            reconnectTask.wakeup();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public TransportListener getTransportListener() {
        return transportListener;
    }

    public void setTransportListener(TransportListener commandListener) {
        this.transportListener = commandListener;
    }

    public <T> T narrow(Class target) {

        if (target.isAssignableFrom(getClass())) {
            return target.cast(this);
        }

        synchronized (reconnectMutex) {
            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                FanoutTransportHandler th = iter.next();
                if (th.transport != null) {
                    T rc = th.transport.narrow(target);
                    if (rc != null) {
                        return rc;
                    }
                }
            }
        }

        return null;

    }

    protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
        th.transport.start();
        stateTracker.setRestoreConsumers(th.transport == primary);
        stateTracker.restore(th.transport);
        for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
            RequestCounter rc = iter2.next();
            th.transport.oneway(rc.command);
        }
    }

    public void add(boolean reblance,URI uris[]) {

        synchronized (reconnectMutex) {
            for (int i = 0; i < uris.length; i++) {
                URI uri = uris[i];

                boolean match = false;
                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                    FanoutTransportHandler th = iter.next();
                    if (th.uri.equals(uri)) {
                        match = true;
                        break;
                    }
                }
                if (!match) {
                    FanoutTransportHandler th = new FanoutTransportHandler(uri);
                    transports.add(th);
                    reconnect();
                }
            }
        }

    }

    public void remove(boolean rebalance,URI uris[]) {

        synchronized (reconnectMutex) {
            for (int i = 0; i < uris.length; i++) {
                URI uri = uris[i];

                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                    FanoutTransportHandler th = iter.next();
                    if (th.uri.equals(uri)) {
                        if (th.transport != null) {
                            ServiceSupport.dispose(th.transport);
                            connectedCount--;
                        }
                        iter.remove();
                        break;
                    }
                }
            }
        }

    }
    
    public void reconnect(URI uri) throws IOException {
		add(true,new URI[]{uri});
		
	}
    
    public boolean isReconnectSupported() {
        return true;
    }

    public boolean isUpdateURIsSupported() {
        return true;
    }
    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
        add(reblance,uris);
    }


    public String getRemoteAddress() {
        if (primary != null) {
            if (primary.transport != null) {
                return primary.transport.getRemoteAddress();
            }
        }
        return null;
    }

    protected void transportListenerOnCommand(Command command) {
        if (transportListener != null) {
            transportListener.onCommand(command);
        }
    }

    public boolean isFaultTolerant() {
        return true;
    }

    public boolean isFanOutQueues() {
        return fanOutQueues;
    }

    public void setFanOutQueues(boolean fanOutQueues) {
        this.fanOutQueues = fanOutQueues;
    }

	public boolean isDisposed() {
		return disposed;
	}
	

    public boolean isConnected() {
        return connected;
    }

    public int getReceiveCounter() {
        int rc = 0;
        synchronized (reconnectMutex) {
            for (FanoutTransportHandler th : transports) {
                if (th.transport != null) {
                    rc += th.transport.getReceiveCounter();
                }
            }
        }
        return rc;
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ FanoutTransport.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.