|
ActiveMQ example source code file (JmsTopicConnector.java)
The ActiveMQ JmsTopicConnector.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.network.jms; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.NamingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Bridge to other JMS Topic providers * * @org.apache.xbean.XBean * * */ public class JmsTopicConnector extends JmsConnector { private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class); private String outboundTopicConnectionFactoryName; private String localConnectionFactoryName; private TopicConnectionFactory outboundTopicConnectionFactory; private TopicConnectionFactory localTopicConnectionFactory; private TopicConnection outboundTopicConnection; private TopicConnection localTopicConnection; private InboundTopicBridge[] inboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges; public boolean init() { boolean result = super.init(); if (result) { try { initializeForeignTopicConnection(); initializeLocalTopicConnection(); initializeInboundJmsMessageConvertor(); initializeOutboundJmsMessageConvertor(); initializeInboundTopicBridges(); initializeOutboundTopicBridges(); } catch (Exception e) { LOG.error("Failed to initialize the JMSConnector", e); } } return result; } /** * @return Returns the inboundTopicBridges. */ public InboundTopicBridge[] getInboundTopicBridges() { return inboundTopicBridges; } /** * @param inboundTopicBridges The inboundTopicBridges to set. */ public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) { this.inboundTopicBridges = inboundTopicBridges; } /** * @return Returns the outboundTopicBridges. */ public OutboundTopicBridge[] getOutboundTopicBridges() { return outboundTopicBridges; } /** * @param outboundTopicBridges The outboundTopicBridges to set. */ public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) { this.outboundTopicBridges = outboundTopicBridges; } /** * @return Returns the localTopicConnectionFactory. */ public TopicConnectionFactory getLocalTopicConnectionFactory() { return localTopicConnectionFactory; } /** * @param localTopicConnectionFactory The localTopicConnectionFactory to * set. */ public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) { this.localTopicConnectionFactory = localConnectionFactory; } /** * @return Returns the outboundTopicConnectionFactory. */ public TopicConnectionFactory getOutboundTopicConnectionFactory() { return outboundTopicConnectionFactory; } /** * @return Returns the outboundTopicConnectionFactoryName. */ public String getOutboundTopicConnectionFactoryName() { return outboundTopicConnectionFactoryName; } /** * @param outboundTopicConnectionFactoryName The * outboundTopicConnectionFactoryName to set. */ public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; } /** * @return Returns the localConnectionFactoryName. */ public String getLocalConnectionFactoryName() { return localConnectionFactoryName; } /** * @param localConnectionFactoryName The localConnectionFactoryName to set. */ public void setLocalConnectionFactoryName(String localConnectionFactoryName) { this.localConnectionFactoryName = localConnectionFactoryName; } /** * @return Returns the localTopicConnection. */ public TopicConnection getLocalTopicConnection() { return localTopicConnection; } /** * @param localTopicConnection The localTopicConnection to set. */ public void setLocalTopicConnection(TopicConnection localTopicConnection) { this.localTopicConnection = localTopicConnection; } /** * @return Returns the outboundTopicConnection. */ public TopicConnection getOutboundTopicConnection() { return outboundTopicConnection; } /** * @param outboundTopicConnection The outboundTopicConnection to set. */ public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { this.outboundTopicConnection = foreignTopicConnection; } /** * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory * to set. */ public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; } public void restartProducerConnection() throws NamingException, JMSException { outboundTopicConnection = null; initializeForeignTopicConnection(); } protected void initializeForeignTopicConnection() throws NamingException, JMSException { if (outboundTopicConnection == null) { // get the connection factories if (outboundTopicConnectionFactory == null) { // look it up from JNDI if (outboundTopicConnectionFactoryName != null) { outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); if (outboundUsername != null) { outboundTopicConnection = outboundTopicConnectionFactory .createTopicConnection(outboundUsername, outboundPassword); } else { outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); } } else { throw new JMSException("Cannot create localConnection - no information"); } } else { if (outboundUsername != null) { outboundTopicConnection = outboundTopicConnectionFactory .createTopicConnection(outboundUsername, outboundPassword); } else { outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); } } } if (localClientId != null && localClientId.length() > 0) { outboundTopicConnection.setClientID(getOutboundClientId()); } outboundTopicConnection.start(); } protected void initializeLocalTopicConnection() throws NamingException, JMSException { if (localTopicConnection == null) { // get the connection factories if (localTopicConnectionFactory == null) { if (embeddedConnectionFactory == null) { // look it up from JNDI if (localConnectionFactoryName != null) { localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate .lookup(localConnectionFactoryName, TopicConnectionFactory.class); if (localUsername != null) { localTopicConnection = localTopicConnectionFactory .createTopicConnection(localUsername, localPassword); } else { localTopicConnection = localTopicConnectionFactory.createTopicConnection(); } } else { throw new JMSException("Cannot create localConnection - no information"); } } else { localTopicConnection = embeddedConnectionFactory.createTopicConnection(); } } else { if (localUsername != null) { localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername, localPassword); } else { localTopicConnection = localTopicConnectionFactory.createTopicConnection(); } } } if (localClientId != null && localClientId.length() > 0) { localTopicConnection.setClientID(getLocalClientId()); } localTopicConnection.start(); } protected void initializeInboundJmsMessageConvertor() { inboundMessageConvertor.setConnection(localTopicConnection); } protected void initializeOutboundJmsMessageConvertor() { outboundMessageConvertor.setConnection(outboundTopicConnection); } protected void initializeInboundTopicBridges() throws JMSException { if (inboundTopicBridges != null) { TopicSession outboundSession = outboundTopicConnection .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSession localSession = localTopicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < inboundTopicBridges.length; i++) { InboundTopicBridge bridge = inboundTopicBridges[i]; String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); String topicName = bridge.getInboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, topicName); bridge.setConsumerTopic(foreignTopic); bridge.setProducerTopic(activemqTopic); bridge.setProducerConnection(localTopicConnection); bridge.setConsumerConnection(outboundTopicConnection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); localSession.close(); } } protected void initializeOutboundTopicBridges() throws JMSException { if (outboundTopicBridges != null) { TopicSession outboundSession = outboundTopicConnection .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSession localSession = localTopicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < outboundTopicBridges.length; i++) { OutboundTopicBridge bridge = outboundTopicBridges[i]; String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); String topicName = bridge.getOutboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, topicName); bridge.setConsumerTopic(activemqTopic); bridge.setProducerTopic(foreignTopic); bridge.setProducerConnection(outboundTopicConnection); bridge.setConsumerConnection(localTopicConnection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); localSession.close(); } } protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection) { Topic replyToProducerTopic = (Topic)destination; boolean isInbound = replyToProducerConnection.equals(localTopicConnection); if (isInbound) { InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); if (bridge == null) { bridge = new InboundTopicBridge() { protected Destination processReplyToDestination(Destination destination) { return null; } }; try { TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); replyToConsumerSession.close(); bridge.setConsumerTopic(replyToConsumerTopic); bridge.setProducerTopic(replyToProducerTopic); bridge.setProducerConnection((TopicConnection)replyToProducerConnection); bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); bridge.setDoHandleReplyTo(false); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); bridge.start(); LOG.info("Created replyTo bridge for " + replyToProducerTopic); } catch (Exception e) { LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); return null; } replyToBridges.put(replyToProducerTopic, bridge); } return bridge.getConsumerTopic(); } else { OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic); if (bridge == null) { bridge = new OutboundTopicBridge() { protected Destination processReplyToDestination(Destination destination) { return null; } }; try { TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); replyToConsumerSession.close(); bridge.setConsumerTopic(replyToConsumerTopic); bridge.setProducerTopic(replyToProducerTopic); bridge.setProducerConnection((TopicConnection)replyToProducerConnection); bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); bridge.setDoHandleReplyTo(false); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } bridge.setJmsConnector(this); bridge.start(); LOG.info("Created replyTo bridge for " + replyToProducerTopic); } catch (Exception e) { LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); return null; } replyToBridges.put(replyToProducerTopic, bridge); } return bridge.getConsumerTopic(); } } protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException { return session.createTopic(topicName); } protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException { Topic result = null; try { result = session.createTopic(topicName); } catch (JMSException e) { // look-up the Topic try { result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class); } catch (NamingException e1) { String errStr = "Failed to look-up Topic for name: " + topicName; LOG.error(errStr, e); JMSException jmsEx = new JMSException(errStr); jmsEx.setLinkedException(e1); throw jmsEx; } } return result; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ JmsTopicConnector.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.