alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (JmsConnector.java)

This example ActiveMQ source code file (JmsConnector.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

atomicboolean, atomicboolean, connection, connector, copyonwritearraylist, destinationbridge, destinationbridge, exception, jmsmesageconvertor, jnditemplate, jnditemplate, lrucache, naming, string, string, threading, threads, util

The ActiveMQ JmsConnector.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jndi.JndiTemplate;

/**
 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
 * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
 * JMS 1.0.2 compliant.
 * 
 * 
 */
public abstract class JmsConnector implements Service {

    private static int nextId;
    private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
    
    protected JndiTemplate jndiLocalTemplate;
    protected JndiTemplate jndiOutboundTemplate;
    protected JmsMesageConvertor inboundMessageConvertor;
    protected JmsMesageConvertor outboundMessageConvertor;
    protected AtomicBoolean initialized = new AtomicBoolean(false);
    protected AtomicBoolean started = new AtomicBoolean(false);
    protected ActiveMQConnectionFactory embeddedConnectionFactory;
    protected int replyToDestinationCacheSize = 10000;
    protected String outboundUsername;
    protected String outboundPassword;
    protected String localUsername;
    protected String localPassword;
    protected String outboundClientId;
    protected String localClientId;
    protected LRUCache replyToBridges = createLRUCache();

    private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList();
    private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList();
    private String name;


    private static LRUCache createLRUCache() {
        return new LRUCache() {
            private static final long serialVersionUID = -7446792754185879286L;

            protected boolean removeEldestEntry(Map.Entry enty) {
                if (size() > maxCacheSize) {
                    Iterator iter = entrySet().iterator();
                    Map.Entry lru = (Map.Entry)iter.next();
                    remove(lru.getKey());
                    DestinationBridge bridge = (DestinationBridge)lru.getValue();
                    try {
                        bridge.stop();
                        LOG.info("Expired bridge: " + bridge);
                    } catch (Exception e) {
                        LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
                    }
                }
                return false;
            }
        };
    }

    /**
     */
    public boolean init() {
        boolean result = initialized.compareAndSet(false, true);
        if (result) {
            if (jndiLocalTemplate == null) {
                jndiLocalTemplate = new JndiTemplate();
            }
            if (jndiOutboundTemplate == null) {
                jndiOutboundTemplate = new JndiTemplate();
            }
            if (inboundMessageConvertor == null) {
                inboundMessageConvertor = new SimpleJmsMessageConvertor();
            }
            if (outboundMessageConvertor == null) {
                outboundMessageConvertor = new SimpleJmsMessageConvertor();
            }
            replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
        }
        return result;
    }

    public void start() throws Exception {
        init();
        if (started.compareAndSet(false, true)) {
            for (int i = 0; i < inboundBridges.size(); i++) {
                DestinationBridge bridge = inboundBridges.get(i);
                bridge.start();
            }
            for (int i = 0; i < outboundBridges.size(); i++) {
                DestinationBridge bridge = outboundBridges.get(i);
                bridge.start();
            }
            LOG.info("JMS Connector " + getName() + " Started");
        }
    }

    public void stop() throws Exception {
        if (started.compareAndSet(true, false)) {
            for (int i = 0; i < inboundBridges.size(); i++) {
                DestinationBridge bridge = inboundBridges.get(i);
                bridge.stop();
            }
            for (int i = 0; i < outboundBridges.size(); i++) {
                DestinationBridge bridge = outboundBridges.get(i);
                bridge.stop();
            }
            LOG.info("JMS Connector " + getName() + " Stopped");
        }
    }
    
    public void clearBridges() {
        inboundBridges.clear();
        outboundBridges.clear();
    }

    protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);

    /**
     * One way to configure the local connection - this is called by The
     * BrokerService when the Connector is embedded
     * 
     * @param service
     */
    public void setBrokerService(BrokerService service) {
        embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
    }

    /**
     * @return Returns the jndiTemplate.
     */
    public JndiTemplate getJndiLocalTemplate() {
        return jndiLocalTemplate;
    }

    /**
     * @param jndiTemplate The jndiTemplate to set.
     */
    public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
        this.jndiLocalTemplate = jndiTemplate;
    }

    /**
     * @return Returns the jndiOutboundTemplate.
     */
    public JndiTemplate getJndiOutboundTemplate() {
        return jndiOutboundTemplate;
    }

    /**
     * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
     */
    public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
        this.jndiOutboundTemplate = jndiOutboundTemplate;
    }

    /**
     * @return Returns the inboundMessageConvertor.
     */
    public JmsMesageConvertor getInboundMessageConvertor() {
        return inboundMessageConvertor;
    }

    /**
     * @param inboundMessageConvertor The inboundMessageConvertor to set.
     */
    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
        this.inboundMessageConvertor = jmsMessageConvertor;
    }

    /**
     * @return Returns the outboundMessageConvertor.
     */
    public JmsMesageConvertor getOutboundMessageConvertor() {
        return outboundMessageConvertor;
    }

    /**
     * @param outboundMessageConvertor The outboundMessageConvertor to set.
     */
    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
        this.outboundMessageConvertor = outboundMessageConvertor;
    }

    /**
     * @return Returns the replyToDestinationCacheSize.
     */
    public int getReplyToDestinationCacheSize() {
        return replyToDestinationCacheSize;
    }

    /**
     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
     *                set.
     */
    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
        this.replyToDestinationCacheSize = replyToDestinationCacheSize;
    }

    /**
     * @return Returns the localPassword.
     */
    public String getLocalPassword() {
        return localPassword;
    }

    /**
     * @param localPassword The localPassword to set.
     */
    public void setLocalPassword(String localPassword) {
        this.localPassword = localPassword;
    }

    /**
     * @return Returns the localUsername.
     */
    public String getLocalUsername() {
        return localUsername;
    }

    /**
     * @param localUsername The localUsername to set.
     */
    public void setLocalUsername(String localUsername) {
        this.localUsername = localUsername;
    }

    /**
     * @return Returns the outboundPassword.
     */
    public String getOutboundPassword() {
        return outboundPassword;
    }

    /**
     * @param outboundPassword The outboundPassword to set.
     */
    public void setOutboundPassword(String outboundPassword) {
        this.outboundPassword = outboundPassword;
    }

    /**
     * @return Returns the outboundUsername.
     */
    public String getOutboundUsername() {
        return outboundUsername;
    }

    /**
     * @param outboundUsername The outboundUsername to set.
     */
    public void setOutboundUsername(String outboundUsername) {
        this.outboundUsername = outboundUsername;
    }
    
    /**
     * @return the outboundClientId
     */
    public String getOutboundClientId() {
        return outboundClientId;
    }

    /**
     * @param outboundClientId the outboundClientId to set
     */
    public void setOutboundClientId(String outboundClientId) {
        this.outboundClientId = outboundClientId;
    }

    /**
     * @return the localClientId
     */
    public String getLocalClientId() {
        return localClientId;
    }

    /**
     * @param localClientId the localClientId to set
     */
    public void setLocalClientId(String localClientId) {
        this.localClientId = localClientId;
    }
    
    
    protected void addInboundBridge(DestinationBridge bridge) {
        inboundBridges.add(bridge);
    }

    protected void addOutboundBridge(DestinationBridge bridge) {
        outboundBridges.add(bridge);
    }

    protected void removeInboundBridge(DestinationBridge bridge) {
        inboundBridges.remove(bridge);
    }

    protected void removeOutboundBridge(DestinationBridge bridge) {
        outboundBridges.remove(bridge);
    }

    public String getName() {
        if (name == null) {
            name = "Connector:" + getNextId();
        }
        return name;
    }

    private static synchronized int getNextId() {
        return nextId++;
    }

    public void setName(String name) {
        this.name = name;
    }

    public abstract void restartProducerConnection() throws NamingException, JMSException;
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ JmsConnector.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.