alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (JmsQueueConnector.java)

This example ActiveMQ source code file (JmsQueueConnector.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

failed, inboundqueuebridge, jmsexception, jmsexception, naming, namingexception, outboundqueuebridge, queue, queue, queueconnection, queueconnectionfactory, queueconnectionfactory, queuesession, string, string

The ActiveMQ JmsQueueConnector.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A Bridge to other JMS Queue providers
 * 
 * @org.apache.xbean.XBean
 * 
 * 
 */
public class JmsQueueConnector extends JmsConnector {
    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
    private String outboundQueueConnectionFactoryName;
    private String localConnectionFactoryName;
    private QueueConnectionFactory outboundQueueConnectionFactory;
    private QueueConnectionFactory localQueueConnectionFactory;
    private QueueConnection outboundQueueConnection;
    private QueueConnection localQueueConnection;
    private InboundQueueBridge[] inboundQueueBridges;
    private OutboundQueueBridge[] outboundQueueBridges;

    public boolean init() {
        boolean result = super.init();
        if (result) {
            try {
                initializeForeignQueueConnection();
                initializeLocalQueueConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundQueueBridges();
                initializeOutboundQueueBridges();
            } catch (Exception e) {
                LOG.error("Failed to initialize the JMSConnector", e);
            }
        }
        return result;
    }

    /**
     * @return Returns the inboundQueueBridges.
     */
    public InboundQueueBridge[] getInboundQueueBridges() {
        return inboundQueueBridges;
    }

    /**
     * @param inboundQueueBridges The inboundQueueBridges to set.
     */
    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
        this.inboundQueueBridges = inboundQueueBridges;
    }

    /**
     * @return Returns the outboundQueueBridges.
     */
    public OutboundQueueBridge[] getOutboundQueueBridges() {
        return outboundQueueBridges;
    }

    /**
     * @param outboundQueueBridges The outboundQueueBridges to set.
     */
    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
        this.outboundQueueBridges = outboundQueueBridges;
    }

    /**
     * @return Returns the localQueueConnectionFactory.
     */
    public QueueConnectionFactory getLocalQueueConnectionFactory() {
        return localQueueConnectionFactory;
    }

    /**
     * @param localQueueConnectionFactory The localQueueConnectionFactory to
     *                set.
     */
    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
        this.localQueueConnectionFactory = localConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactory.
     */
    public QueueConnectionFactory getOutboundQueueConnectionFactory() {
        return outboundQueueConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactoryName.
     */
    public String getOutboundQueueConnectionFactoryName() {
        return outboundQueueConnectionFactoryName;
    }

    /**
     * @param outboundQueueConnectionFactoryName The
     *                outboundQueueConnectionFactoryName to set.
     */
    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
        this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName() {
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
        this.localConnectionFactoryName = localConnectionFactoryName;
    }

    /**
     * @return Returns the localQueueConnection.
     */
    public QueueConnection getLocalQueueConnection() {
        return localQueueConnection;
    }

    /**
     * @param localQueueConnection The localQueueConnection to set.
     */
    public void setLocalQueueConnection(QueueConnection localQueueConnection) {
        this.localQueueConnection = localQueueConnection;
    }

    /**
     * @return Returns the outboundQueueConnection.
     */
    public QueueConnection getOutboundQueueConnection() {
        return outboundQueueConnection;
    }

    /**
     * @param outboundQueueConnection The outboundQueueConnection to set.
     */
    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
        this.outboundQueueConnection = foreignQueueConnection;
    }

    /**
     * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
     *                to set.
     */
    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
        this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
    }

    public void restartProducerConnection() throws NamingException, JMSException {
        outboundQueueConnection = null;
        initializeForeignQueueConnection();

        // the outboundQueueConnection was reestablished - publish the new connection to the bridges
        if (inboundQueueBridges != null) {
        	for (int i = 0; i < inboundQueueBridges.length; i++) {
        		InboundQueueBridge bridge = inboundQueueBridges[i];
        		bridge.setConsumerConnection(outboundQueueConnection);
        	}
        }
        if (outboundQueueBridges != null) {
        	for (int i = 0; i < outboundQueueBridges.length; i++) {
        		OutboundQueueBridge bridge = outboundQueueBridges[i];
        		bridge.setProducerConnection(outboundQueueConnection);
        	}
        }
    }

    protected void initializeForeignQueueConnection() throws NamingException, JMSException {
        if (outboundQueueConnection == null) {
            // get the connection factories
            if (outboundQueueConnectionFactory == null) {
                // look it up from JNDI
                if (outboundQueueConnectionFactoryName != null) {
                    outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
                        .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
                    if (outboundUsername != null) {
                        outboundQueueConnection = outboundQueueConnectionFactory
                            .createQueueConnection(outboundUsername, outboundPassword);
                    } else {
                        outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
                    }
                } else {
                    throw new JMSException("Cannot create foreignConnection - no information");
                }
            } else {
                if (outboundUsername != null) {
                    outboundQueueConnection = outboundQueueConnectionFactory
                        .createQueueConnection(outboundUsername, outboundPassword);
                } else {
                    outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        if (localClientId != null && localClientId.length() > 0) {
            outboundQueueConnection.setClientID(getOutboundClientId());
        }
        outboundQueueConnection.start();
    }

    protected void initializeLocalQueueConnection() throws NamingException, JMSException {
        if (localQueueConnection == null) {
            // get the connection factories
            if (localQueueConnectionFactory == null) {
                if (embeddedConnectionFactory == null) {
                    // look it up from JNDI
                    if (localConnectionFactoryName != null) {
                        localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
                            .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
                        if (localUsername != null) {
                            localQueueConnection = localQueueConnectionFactory
                                .createQueueConnection(localUsername, localPassword);
                        } else {
                            localQueueConnection = localQueueConnectionFactory.createQueueConnection();
                        }
                    } else {
                        throw new JMSException("Cannot create localConnection - no information");
                    }
                } else {
                    localQueueConnection = embeddedConnectionFactory.createQueueConnection();
                }
            } else {
                if (localUsername != null) {
                    localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername,
                                                                                             localPassword);
                } else {
                    localQueueConnection = localQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        if (localClientId != null && localClientId.length() > 0) {
            localQueueConnection.setClientID(getLocalClientId());
        }
        localQueueConnection.start();
    }

    protected void initializeInboundJmsMessageConvertor() {
        inboundMessageConvertor.setConnection(localQueueConnection);
    }

    protected void initializeOutboundJmsMessageConvertor() {
        outboundMessageConvertor.setConnection(outboundQueueConnection);
    }

    protected void initializeInboundQueueBridges() throws JMSException {
        if (inboundQueueBridges != null) {
            QueueSession outboundSession = outboundQueueConnection
                .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = localQueueConnection.createQueueSession(false,
                                                                                Session.AUTO_ACKNOWLEDGE);
            for (int i = 0; i < inboundQueueBridges.length; i++) {
                InboundQueueBridge bridge = inboundQueueBridges[i];
                String localQueueName = bridge.getLocalQueueName();
                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
                String queueName = bridge.getInboundQueueName();
                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
                bridge.setConsumerQueue(foreignQueue);
                bridge.setProducerQueue(activemqQueue);
                bridge.setProducerConnection(localQueueConnection);
                bridge.setConsumerConnection(outboundQueueConnection);
                if (bridge.getJmsMessageConvertor() == null) {
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundQueueBridges() throws JMSException {
        if (outboundQueueBridges != null) {
            QueueSession outboundSession = outboundQueueConnection
                .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = localQueueConnection.createQueueSession(false,
                                                                                Session.AUTO_ACKNOWLEDGE);
            for (int i = 0; i < outboundQueueBridges.length; i++) {
                OutboundQueueBridge bridge = outboundQueueBridges[i];
                String localQueueName = bridge.getLocalQueueName();
                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
                String queueName = bridge.getOutboundQueueName();
                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
                bridge.setConsumerQueue(activemqQueue);
                bridge.setProducerQueue(foreignQueue);
                bridge.setProducerConnection(outboundQueueConnection);
                bridge.setConsumerConnection(localQueueConnection);
                if (bridge.getJmsMessageConvertor() == null) {
                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
                                              Connection replyToConsumerConnection) {
        Queue replyToProducerQueue = (Queue)destination;
        boolean isInbound = replyToProducerConnection.equals(localQueueConnection);

        if (isInbound) {
            InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
            if (bridge == null) {
                bridge = new InboundQueueBridge() {
                    protected Destination processReplyToDestination(Destination destination) {
                        return null;
                    }
                };
                try {
                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
                    replyToConsumerSession.close();
                    bridge.setConsumerQueue(replyToConsumerQueue);
                    bridge.setProducerQueue(replyToProducerQueue);
                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
                    bridge.setDoHandleReplyTo(false);
                    if (bridge.getJmsMessageConvertor() == null) {
                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                    }
                    bridge.setJmsConnector(this);
                    bridge.start();
                    LOG.info("Created replyTo bridge for " + replyToProducerQueue);
                } catch (Exception e) {
                    LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
                    return null;
                }
                replyToBridges.put(replyToProducerQueue, bridge);
            }
            return bridge.getConsumerQueue();
        } else {
            OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
            if (bridge == null) {
                bridge = new OutboundQueueBridge() {
                    protected Destination processReplyToDestination(Destination destination) {
                        return null;
                    }
                };
                try {
                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
                    replyToConsumerSession.close();
                    bridge.setConsumerQueue(replyToConsumerQueue);
                    bridge.setProducerQueue(replyToProducerQueue);
                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
                    bridge.setDoHandleReplyTo(false);
                    if (bridge.getJmsMessageConvertor() == null) {
                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                    }
                    bridge.setJmsConnector(this);
                    bridge.start();
                    LOG.info("Created replyTo bridge for " + replyToProducerQueue);
                } catch (Exception e) {
                    LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
                    return null;
                }
                replyToBridges.put(replyToProducerQueue, bridge);
            }
            return bridge.getConsumerQueue();
        }
    }

    protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
        return session.createQueue(queueName);
    }

    protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
        Queue result = null;
        try {
            result = session.createQueue(queueName);
        } catch (JMSException e) {
            // look-up the Queue
            try {
                result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
            } catch (NamingException e1) {
                String errStr = "Failed to look-up Queue for name: " + queueName;
                LOG.error(errStr, e);
                JMSException jmsEx = new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ JmsQueueConnector.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.