|
ActiveMQ example source code file (MasterConnector.java)
The ActiveMQ MasterConnector.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.broker.ft; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Connects a Slave Broker to a Master when using <a * href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High * Availability of messages. * * @org.apache.xbean.XBean * */ public class MasterConnector implements Service, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(MasterConnector.class); private BrokerService broker; private URI remoteURI; private URI localURI; private Transport localBroker; private Transport remoteBroker; private TransportConnector connector; private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false); private final IdGenerator idGenerator = new IdGenerator(); private String userName; private String password; private ConnectionInfo connectionInfo; private SessionInfo sessionInfo; private ProducerInfo producerInfo; private final AtomicBoolean masterActive = new AtomicBoolean(); private BrokerInfo brokerInfo; private boolean firstConnection=true; private boolean failedToStart; public MasterConnector() { } public MasterConnector(String remoteUri) throws URISyntaxException { remoteURI = new URI(remoteUri); } public void setBrokerService(BrokerService broker) { this.broker = broker; if (localURI == null) { localURI = broker.getVmConnectorURI(); } if (connector == null) { List transportConnectors = broker.getTransportConnectors(); if (!transportConnectors.isEmpty()) { connector = (TransportConnector)transportConnectors.get(0); } } } public boolean isSlave() { return masterActive.get(); } protected void restartBridge() throws Exception { localBroker.oneway(connectionInfo); remoteBroker.oneway(connectionInfo); localBroker.oneway(sessionInfo); remoteBroker.oneway(sessionInfo); remoteBroker.oneway(producerInfo); remoteBroker.oneway(brokerInfo); } public void start() throws Exception { if (!started.compareAndSet(false, true)) { return; } if (remoteURI == null) { throw new IllegalArgumentException("You must specify a remoteURI"); } localBroker = TransportFactory.connect(localURI); remoteBroker = TransportFactory.connect(remoteURI); LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker); localBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object command) { } public void onException(IOException error) { if (started.get()) { serviceLocalException(error); } } }); remoteBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object o) { Command command = (Command)o; if (started.get()) { serviceRemoteCommand(command); } } public void onException(IOException error) { if (started.get()) { serviceRemoteException(error); } } public void transportResumed() { try{ if(!firstConnection){ localBroker = TransportFactory.connect(localURI); localBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object command) { } public void onException(IOException error) { if (started.get()) { serviceLocalException(error); } } }); localBroker.start(); restartBridge(); LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished."); }else{ firstConnection=false; } }catch(IOException e){ LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e); }catch(Exception e){ LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e); } } }); try { localBroker.start(); remoteBroker.start(); startBridge(); masterActive.set(true); } catch (Exception e) { masterActive.set(false); if(!stoppedBeforeStart.get()){ LOG.error("Failed to start network bridge: " + e, e); }else{ LOG.info("Slave stopped before connected to the master."); } setFailedToStart(true); } } protected void startBridge() throws Exception { connectionInfo = new ConnectionInfo(); connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); connectionInfo.setClientId(idGenerator.generateId()); connectionInfo.setUserName(userName); connectionInfo.setPassword(password); connectionInfo.setBrokerMasterConnector(true); sessionInfo = new SessionInfo(connectionInfo, 1); producerInfo = new ProducerInfo(sessionInfo, 1); producerInfo.setResponseRequired(false); if (connector != null) { brokerInfo = connector.getBrokerInfo(); } else { brokerInfo = new BrokerInfo(); } brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); brokerInfo.setSlaveBroker(true); brokerInfo.setPassiveSlave(broker.isPassiveSlave()); restartBridge(); LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established."); } public void stop() throws Exception { if (!started.compareAndSet(true, false)||!masterActive.get()) { return; } masterActive.set(false); try { // if (connectionInfo!=null){ // localBroker.request(connectionInfo.createRemoveCommand()); // } // localBroker.setTransportListener(null); // remoteBroker.setTransportListener(null); remoteBroker.oneway(new ShutdownInfo()); localBroker.oneway(new ShutdownInfo()); } catch (IOException e) { LOG.debug("Caught exception stopping", e); } finally { ServiceStopper ss = new ServiceStopper(); ss.stop(localBroker); ss.stop(remoteBroker); ss.throwFirstException(); } } public void stopBeforeConnected()throws Exception{ masterActive.set(false); started.set(false); stoppedBeforeStart.set(true); ServiceStopper ss = new ServiceStopper(); ss.stop(localBroker); ss.stop(remoteBroker); } protected void serviceRemoteException(IOException error) { LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); shutDown(); } protected void serviceRemoteCommand(Command command) { try { if (command.isMessageDispatch()) { MessageDispatch md = (MessageDispatch)command; command = md.getMessage(); } if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) { LOG.warn("The Master has shutdown"); shutDown(); } else { boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); if (responseRequired) { Response response = (Response)localBroker.request(command); response.setCorrelationId(commandId); remoteBroker.oneway(response); } else { localBroker.oneway(command); } } } catch (IOException e) { serviceRemoteException(e); } } protected void serviceLocalException(Throwable error) { if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){ LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); ServiceSupport.dispose(this); }else{ LOG.info(error.getMessage()); } } /** * @return Returns the localURI. */ public URI getLocalURI() { return localURI; } /** * @param localURI The localURI to set. */ public void setLocalURI(URI localURI) { this.localURI = localURI; } /** * @return Returns the remoteURI. */ public URI getRemoteURI() { return remoteURI; } /** * @param remoteURI The remoteURI to set. */ public void setRemoteURI(URI remoteURI) { this.remoteURI = remoteURI; } /** * @return Returns the password. */ public String getPassword() { return password; } /** * @param password The password to set. */ public void setPassword(String password) { this.password = password; } /** * @return Returns the userName. */ public String getUserName() { return userName; } /** * @param userName The userName to set. */ public void setUserName(String userName) { this.userName = userName; } private void shutDown() { masterActive.set(false); broker.masterFailed(); ServiceSupport.dispose(this); } public boolean isStoppedBeforeStart() { return stoppedBeforeStart.get(); } /** * Get the failedToStart * @return the failedToStart */ public boolean isFailedToStart() { return this.failedToStart; } /** * Set the failedToStart * @param failedToStart the failedToStart to set */ public void setFailedToStart(boolean failedToStart) { this.failedToStart = failedToStart; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ MasterConnector.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.