|
ActiveMQ example source code file (JmsQueueConnector.java)
The ActiveMQ JmsQueueConnector.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.network.jms; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import javax.naming.NamingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Bridge to other JMS Queue providers * * @org.apache.xbean.XBean * * */ public class JmsQueueConnector extends JmsConnector { private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class); private String outboundQueueConnectionFactoryName; private String localConnectionFactoryName; private QueueConnectionFactory outboundQueueConnectionFactory; private QueueConnectionFactory localQueueConnectionFactory; private QueueConnection outboundQueueConnection; private QueueConnection localQueueConnection; private InboundQueueBridge[] inboundQueueBridges; private OutboundQueueBridge[] outboundQueueBridges; public boolean init() { boolean result = super.init(); if (result) { try { initializeForeignQueueConnection(); initializeLocalQueueConnection(); initializeInboundJmsMessageConvertor(); initializeOutboundJmsMessageConvertor(); initializeInboundQueueBridges(); initializeOutboundQueueBridges(); } catch (Exception e) { LOG.error("Failed to initialize the JMSConnector", e); } } return result; } /** * @return Returns the inboundQueueBridges. */ public InboundQueueBridge[] getInboundQueueBridges() { return inboundQueueBridges; } /** * @param inboundQueueBridges The inboundQueueBridges to set. */ public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) { this.inboundQueueBridges = inboundQueueBridges; } /** * @return Returns the outboundQueueBridges. */ public OutboundQueueBridge[] getOutboundQueueBridges() { return outboundQueueBridges; } /** * @param outboundQueueBridges The outboundQueueBridges to set. */ public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) { this.outboundQueueBridges = outboundQueueBridges; } /** * @return Returns the localQueueConnectionFactory. */ public QueueConnectionFactory getLocalQueueConnectionFactory() { return localQueueConnectionFactory; } /** * @param localQueueConnectionFactory The localQueueConnectionFactory to * set. */ public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) { this.localQueueConnectionFactory = localConnectionFactory; } /** * @return Returns the outboundQueueConnectionFactory. */ public QueueConnectionFactory getOutboundQueueConnectionFactory() { return outboundQueueConnectionFactory; } /** * @return Returns the outboundQueueConnectionFactoryName. */ public String getOutboundQueueConnectionFactoryName() { return outboundQueueConnectionFactoryName; } /** * @param outboundQueueConnectionFactoryName The * outboundQueueConnectionFactoryName to set. */ public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) { this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName; } /** * @return Returns the localConnectionFactoryName. */ public String getLocalConnectionFactoryName() { return localConnectionFactoryName; } /** * @param localConnectionFactoryName The localConnectionFactoryName to set. */ public void setLocalConnectionFactoryName(String localConnectionFactoryName) { this.localConnectionFactoryName = localConnectionFactoryName; } /** * @return Returns the localQueueConnection. */ public QueueConnection getLocalQueueConnection() { return localQueueConnection; } /** * @param localQueueConnection The localQueueConnection to set. */ public void setLocalQueueConnection(QueueConnection localQueueConnection) { this.localQueueConnection = localQueueConnection; } /** * @return Returns the outboundQueueConnection. */ public QueueConnection getOutboundQueueConnection() { return outboundQueueConnection; } /** * @param outboundQueueConnection The outboundQueueConnection to set. */ public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { this.outboundQueueConnection = foreignQueueConnection; } /** * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory * to set. */ public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) { this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; } public void restartProducerConnection() throws NamingException, JMSException { outboundQueueConnection = null; initializeForeignQueueConnection(); // the outboundQueueConnection was reestablished - publish the new connection to the bridges if (inboundQueueBridges != null) { for (int i = 0; i < inboundQueueBridges.length; i++) { InboundQueueBridge bridge = inboundQueueBridges[i]; bridge.setConsumerConnection(outboundQueueConnection); } } if (outboundQueueBridges != null) { for (int i = 0; i < outboundQueueBridges.length; i++) { OutboundQueueBridge bridge = outboundQueueBridges[i]; bridge.setProducerConnection(outboundQueueConnection); } } } protected void initializeForeignQueueConnection() throws NamingException, JMSException { if (outboundQueueConnection == null) { // get the connection factories if (outboundQueueConnectionFactory == null) { // look it up from JNDI if (outboundQueueConnectionFactoryName != null) { outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); if (outboundUsername != null) { outboundQueueConnection = outboundQueueConnectionFactory .createQueueConnection(outboundUsername, outboundPassword); } else { outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); } } else { throw new JMSException("Cannot create foreignConnection - no information"); } } else { if (outboundUsername != null) { outboundQueueConnection = outboundQueueConnectionFactory .createQueueConnection(outboundUsername, outboundPassword); } else { outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); } } } if (localClientId != null && localClientId.length() > 0) { outboundQueueConnection.setClientID(getOutboundClientId()); } outboundQueueConnection.start(); } protected void initializeLocalQueueConnection() throws NamingException, JMSException { if (localQueueConnection == null) { // get the connection factories if (localQueueConnectionFactory == null) { if (embeddedConnectionFactory == null) { // look it up from JNDI if (localConnectionFactoryName != null) { localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate .lookup(localConnectionFactoryName, QueueConnectionFactory.class); if (localUsername != null) { localQueueConnection = localQueueConnectionFactory .createQueueConnection(localUsername, localPassword); } else { localQueueConnection = localQueueConnectionFactory.createQueueConnection(); } } else { throw new JMSException("Cannot create localConnection - no information"); } } else { localQueueConnection = embeddedConnectionFactory.createQueueConnection(); } } else { if (localUsername != null) { localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername, localPassword); } else { localQueueConnection = localQueueConnectionFactory.createQueueConnection(); } } } if (localClientId != null && localClientId.length() > 0) { localQueueConnection.setClientID(getLocalClientId()); } localQueueConnection.start(); } protected void initializeInboundJmsMessageConvertor() { inboundMessageConvertor.setConnection(localQueueConnection); } protected void initializeOutboundJmsMessageConvertor() { outboundMessageConvertor.setConnection(outboundQueueConnection); } protected void initializeInboundQueueBridges() throws JMSException { if (inboundQueueBridges != null) { QueueSession outboundSession = outboundQueueConnection .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSession localSession = localQueueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < inboundQueueBridges.length; i++) { InboundQueueBridge bridge = inboundQueueBridges[i]; String localQueueName = bridge.getLocalQueueName(); Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); String queueName = bridge.getInboundQueueName(); Queue foreignQueue = createForeignQueue(outboundSession, queueName); bridge.setConsumerQueue(foreignQueue); bridge.setProducerQueue(activemqQueue); bridge.setProducerConnection(localQueueConnection); bridge.setConsumerConnection(outboundQueueConnection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); localSession.close(); } } protected void initializeOutboundQueueBridges() throws JMSException { if (outboundQueueBridges != null) { QueueSession outboundSession = outboundQueueConnection .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSession localSession = localQueueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < outboundQueueBridges.length; i++) { OutboundQueueBridge bridge = outboundQueueBridges[i]; String localQueueName = bridge.getLocalQueueName(); Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); String queueName = bridge.getOutboundQueueName(); Queue foreignQueue = createForeignQueue(outboundSession, queueName); bridge.setConsumerQueue(activemqQueue); bridge.setProducerQueue(foreignQueue); bridge.setProducerConnection(outboundQueueConnection); bridge.setConsumerConnection(localQueueConnection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); localSession.close(); } } protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection) { Queue replyToProducerQueue = (Queue)destination; boolean isInbound = replyToProducerConnection.equals(localQueueConnection); if (isInbound) { InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); if (bridge == null) { bridge = new InboundQueueBridge() { protected Destination processReplyToDestination(Destination destination) { return null; } }; try { QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); replyToConsumerSession.close(); bridge.setConsumerQueue(replyToConsumerQueue); bridge.setProducerQueue(replyToProducerQueue); bridge.setProducerConnection((QueueConnection)replyToProducerConnection); bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); bridge.setDoHandleReplyTo(false); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); bridge.start(); LOG.info("Created replyTo bridge for " + replyToProducerQueue); } catch (Exception e) { LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); return null; } replyToBridges.put(replyToProducerQueue, bridge); } return bridge.getConsumerQueue(); } else { OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue); if (bridge == null) { bridge = new OutboundQueueBridge() { protected Destination processReplyToDestination(Destination destination) { return null; } }; try { QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); replyToConsumerSession.close(); bridge.setConsumerQueue(replyToConsumerQueue); bridge.setProducerQueue(replyToProducerQueue); bridge.setProducerConnection((QueueConnection)replyToProducerConnection); bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); bridge.setDoHandleReplyTo(false); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } bridge.setJmsConnector(this); bridge.start(); LOG.info("Created replyTo bridge for " + replyToProducerQueue); } catch (Exception e) { LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); return null; } replyToBridges.put(replyToProducerQueue, bridge); } return bridge.getConsumerQueue(); } } protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException { return session.createQueue(queueName); } protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException { Queue result = null; try { result = session.createQueue(queueName); } catch (JMSException e) { // look-up the Queue try { result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class); } catch (NamingException e1) { String errStr = "Failed to look-up Queue for name: " + queueName; LOG.error(errStr, e); JMSException jmsEx = new JMSException(errStr); jmsEx.setLinkedException(e1); throw jmsEx; } } return result; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ JmsQueueConnector.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.