|
ActiveMQ example source code file (NetworkOfTwentyBrokersTest.java)
The ActiveMQ NetworkOfTwentyBrokersTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.usecases; import java.net.URI; import java.util.Collection; import java.util.Iterator; import java.util.List; import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.ThreadTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NetworkOfTwentyBrokersTest extends JmsMultipleBrokersTestSupport { private static final Logger LOG = LoggerFactory.getLogger(NetworkOfTwentyBrokersTest.class); // This will interconnect all brokers using multicast protected void bridgeAllBrokers() throws Exception { bridgeAllBrokers("TwentyBrokersTest", 1, false, false); } protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception { bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false); } protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception { Collection<BrokerItem> brokerList = brokers.values(); for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { BrokerService broker = i.next().broker; List<TransportConnector> transportConnectors = broker.getTransportConnectors(); if (transportConnectors.isEmpty()) { broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); transportConnectors = broker.getTransportConnectors(); } TransportConnector transport = transportConnectors.get(0); if (transport.getDiscoveryUri() == null) { transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName)); } List<NetworkConnector> networkConnectors = broker.getNetworkConnectors(); if (networkConnectors.isEmpty()) { broker.addNetworkConnector("multicast://default?group=" + groupName); networkConnectors = broker.getNetworkConnectors(); } NetworkConnector nc = networkConnectors.get(0); nc.setNetworkTTL(ttl); nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); nc.setDecreaseNetworkConsumerPriority(decreasePriority); } // Multicasting may take longer to setup maxSetupTime = 8000; } protected BrokerService createBroker(String brokerName) throws Exception { BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); broker.setBrokerName(brokerName); broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); brokers.put(brokerName, new BrokerItem(broker)); return broker; } /* AMQ-3077 Bug */ public void testBrokers() throws Exception { int X = 20; int i; LOG.info("Creating X Brokers"); for (i = 0; i < X; i++) { createBroker("Broker" + i); } bridgeAllBrokers(); startAllBrokers(); waitForBridgeFormation(X-1); LOG.info("Waiting for complete formation"); try { Thread.sleep(10000); } catch (Exception e) { } verifyPeerBrokerInfos(X-1); LOG.info("Stopping half the brokers"); for (i = 0; i < X/2; i++) { destroyBroker("Broker" + i); } LOG.info("Waiting for complete stop"); try { Thread.sleep(10000); } catch (Exception e) { } verifyPeerBrokerInfos((X/2) -1); LOG.info("Recreating first half"); for (i = 0; i < X/2; i++) { createBroker("Broker" + i); } bridgeAllBrokers(); startAllBrokers(); waitForBridgeFormation(X-1); LOG.info("Waiting for complete reformation"); try { Thread.sleep(10000); } catch (Exception e) { } verifyPeerBrokerInfos(X-1); } public void testPeerBrokerCountHalfPeer() throws Exception { createBroker("A"); createBroker("B"); bridgeBrokers("A", "B"); startAllBrokers(); verifyPeerBrokerInfo(brokers.get("A"), 1); verifyPeerBrokerInfo(brokers.get("B"), 0); } public void testPeerBrokerCountHalfPeerTwice() throws Exception { createBroker("A"); createBroker("B"); bridgeBrokers("A", "B"); bridgeBrokers("A", "B"); startAllBrokers(); verifyPeerBrokerInfo(brokers.get("A"), 1); verifyPeerBrokerInfo(brokers.get("B"), 0); } public void testPeerBrokerCountFullPeer() throws Exception { createBroker("A"); createBroker("B"); bridgeBrokers("A", "B"); bridgeBrokers("B", "A"); startAllBrokers(); verifyPeerBrokerInfo(brokers.get("A"), 1); verifyPeerBrokerInfo(brokers.get("B"), 1); } public void testPeerBrokerCountFullPeerDuplex() throws Exception { createBroker("A"); createBroker("B"); NetworkConnector nc = bridgeBrokers("A", "B"); nc.setDuplex(true); startAllBrokers(); verifyPeerBrokerInfo(brokers.get("A"), 1); verifyPeerBrokerInfo(brokers.get("B"), 1); } private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) { BrokerService broker = brokerItem.broker; RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { LOG.info(info.getBrokerName()); } assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); } private void verifyPeerBrokerInfos(final int max) { Collection<BrokerItem> brokerList = brokers.values(); for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { verifyPeerBrokerInfo(i.next(), max); } } @Override public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); } @Override public void tearDown() throws Exception { super.tearDown(); ThreadTracker.result(); } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ NetworkOfTwentyBrokersTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.