alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (DiscoveryNetworkConnector.java)

This example ActiveMQ source code file (DiscoveryNetworkConnector.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

could, could, discovernetworkbridgelistener, discoverynetworkconnector, exception, exception, io, ioexception, ioexception, management, net, network, networkbridge, string, transport, uri, uri, urisyntaxexception, util

The ActiveMQ DiscoveryNetworkConnector.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.ObjectName;

/**
 * A network connector which uses a discovery agent to detect the remote brokers
 * available and setup a connection to each available remote broker
 * 
 * @org.apache.xbean.XBean element="networkConnector"
 * 
 */
public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class);

    private DiscoveryAgent discoveryAgent;
    
    private Map<String, String> parameters;
    
    public DiscoveryNetworkConnector() {
    }

    public DiscoveryNetworkConnector(URI discoveryURI) throws IOException {
        setUri(discoveryURI);
    }

    public void setUri(URI discoveryURI) throws IOException {
        setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
        try {
            parameters = URISupport.parseParameters(discoveryURI);
            // allow discovery agent to grab it's parameters
            IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
        } catch (URISyntaxException e) {
            LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
        }  
        
    }

    public void onServiceAdd(DiscoveryEvent event) {
        // Ignore events once we start stopping.
        if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
            return;
        }
        String url = event.getServiceName();
        if (url != null) {
            URI uri;
            try {
                uri = new URI(url);
            } catch (URISyntaxException e) {
                LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
                return;
            }
            // Should we try to connect to that URI?
            if( bridges.containsKey(uri) ) {
                LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
                return;
            }
            if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
                LOG.debug("not connecting loopback: " + uri);
                return;
            }
            URI connectUri = uri;
            try {
                connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
            } catch (URISyntaxException e) {
                LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
            }
            LOG.info("Establishing network connection from " + localURI + " to " + connectUri);

            Transport remoteTransport;
            Transport localTransport;
            try {
                // Allows the transport to access the broker's ssl configuration.
                SslContext.setCurrentSslContext(getBrokerService().getSslContext());
                try {
                    remoteTransport = TransportFactory.connect(connectUri);
                } catch (Exception e) {
                    LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
                    LOG.debug("Connection failure exception: " + e, e);
                    return;
                }
                try {
                    localTransport = createLocalTransport();
                } catch (Exception e) {
                    ServiceSupport.dispose(remoteTransport);
                    LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
                    LOG.debug("Connection failure exception: " + e, e);
                    return;
                }
            } finally {
                SslContext.setCurrentSslContext(null);
            }
            NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
            try {
                bridge.start();
                bridges.put(uri, bridge);
    	    } catch (TransportDisposedIOException e) {
                LOG.warn("Network bridge between: " + localURI + " and: " + uri + " was correctly stopped before it was correctly started.");
            } catch (Exception e) {
                ServiceSupport.dispose(localTransport);
                ServiceSupport.dispose(remoteTransport);
                LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
                LOG.debug("Start failure exception: " + e, e);
                try {
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e1) {
                    LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
                }
                return;
            }
        }
    }

    public void onServiceRemove(DiscoveryEvent event) {
        String url = event.getServiceName();
        if (url != null) {
            URI uri;
            try {
                uri = new URI(url);
            } catch (URISyntaxException e) {
                LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
                return;
            }

            NetworkBridge bridge = bridges.remove(uri);
            if (bridge == null) {
                return;
            }

            ServiceSupport.dispose(bridge);
        }
    }

    public DiscoveryAgent getDiscoveryAgent() {
        return discoveryAgent;
    }

    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
        this.discoveryAgent = discoveryAgent;
        if (discoveryAgent != null) {
            this.discoveryAgent.setDiscoveryListener(this);
        }
    }

    protected void handleStart() throws Exception {
        if (discoveryAgent == null) {
            throw new IllegalStateException("You must configure the 'discoveryAgent' property");
        }
        this.discoveryAgent.start();
        super.handleStart();
    }

    protected void handleStop(ServiceStopper stopper) throws Exception {
        for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) {
            NetworkBridge bridge = i.next();
            try {
                bridge.stop();
            } catch (Exception e) {
                stopper.onException(this, e);
            }
        }
        try {
            this.discoveryAgent.stop();
        } catch (Exception e) {
            stopper.onException(this, e);
        }

        super.handleStop(stopper);
    }

    protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
        class DiscoverNetworkBridgeListener extends MBeanNetworkListener {

            public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) {
                super(brokerService, connectorName);
            }

            public void bridgeFailed() {
                if (!serviceSupport.isStopped()) {
                    try {
                        discoveryAgent.serviceFailed(event);
                    } catch (IOException e) {
                    }
                }

            }
        }
        NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());

        DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
        result.setBrokerService(getBrokerService());
        return configureBridge(result);
    }

    @Override
    public String toString() {
        return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ DiscoveryNetworkConnector.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.