|
ActiveMQ example source code file (NetworkLoadTest.java)
The ActiveMQ NetworkLoadTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.network; import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.usage.SystemUsage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This test case is used to load test store and forwarding between brokers. It sets up * n brokers to which have a chain of queues which this test consumes and produces to. * * If the network bridges gets stuck at any point subsequent queues will not get messages. This test * samples the production and consumption stats every second and if the flow of messages * get stuck then this tast fails. The test monitors the flow of messages for 1 min. * * @author chirino */ public class NetworkLoadTest extends TestCase { private static final transient Logger LOG = LoggerFactory.getLogger(NetworkLoadTest.class); // How many times do we sample? private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", ""+60*1/5)); // Slower machines might need to make this bigger. private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 5)); protected static final int BROKER_COUNT = 4; protected static final int MESSAGE_SIZE = 2000; String groupId; class ForwardingClient { private final AtomicLong forwardCounter = new AtomicLong(); private final Connection toConnection; private final Connection fromConnection; public ForwardingClient(int from, int to) throws JMSException { toConnection = createConnection(from); Session toSession = toConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); final MessageProducer producer = toSession.createProducer(new ActiveMQQueue("Q"+to)); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setDisableMessageID(true); fromConnection = createConnection(from); Session fromSession = fromConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = fromSession.createConsumer(new ActiveMQQueue("Q"+from)); consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { try { producer.send(msg); forwardCounter.incrementAndGet(); } catch (JMSException e) { // this is caused by the connection getting closed. } } }); } public void start() throws JMSException { toConnection.start(); fromConnection.start(); } public void stop() throws JMSException { toConnection.stop(); fromConnection.stop(); } public void close() throws JMSException { toConnection.close(); fromConnection.close(); } } private BrokerService[] brokers; private ForwardingClient[] forwardingClients; protected void setUp() throws Exception { groupId = "network-load-test-"+System.currentTimeMillis(); brokers = new BrokerService[BROKER_COUNT]; for (int i = 0; i < brokers.length; i++) { LOG.info("Starting broker: "+i); brokers[i] = createBroker(i); brokers[i].start(); } // Wait for the network connection to get setup. // The wait is exponential since every broker has to connect to every other broker. Thread.sleep(BROKER_COUNT*BROKER_COUNT*50); forwardingClients = new ForwardingClient[BROKER_COUNT-1]; for (int i = 0; i < forwardingClients.length; i++) { LOG.info("Starting fowarding client "+i); forwardingClients[i] = new ForwardingClient(i, i+1); forwardingClients[i].start(); } } protected void tearDown() throws Exception { for (int i = 0; i < forwardingClients.length; i++) { LOG.info("Stoping fowarding client "+i); forwardingClients[i].close(); } for (int i = 0; i < brokers.length; i++) { LOG.info("Stoping broker "+i); brokers[i].stop(); } } protected Connection createConnection(int brokerId) throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:"+(60000+brokerId)); connectionFactory.setOptimizedMessageDispatch(true); connectionFactory.setCopyMessageOnSend(false); connectionFactory.setUseCompression(false); connectionFactory.setDispatchAsync(true); connectionFactory.setUseAsyncSend(false); connectionFactory.setOptimizeAcknowledge(false); connectionFactory.setWatchTopicAdvisories(false); ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy(); qPrefetchPolicy.setQueuePrefetch(100); qPrefetchPolicy.setTopicPrefetch(1000); connectionFactory.setPrefetchPolicy(qPrefetchPolicy); connectionFactory.setAlwaysSyncSend(true); return connectionFactory.createConnection(); } protected BrokerService createBroker(int brokerId) throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName("broker-" + brokerId); broker.setPersistent(false); broker.setUseJmx(true); broker.getManagementContext().setCreateConnector(false); final SystemUsage memoryManager = new SystemUsage(); memoryManager.getMemoryUsage().setLimit(1024 * 1024 * 50); // 50 MB broker.setSystemUsage(memoryManager); final List<PolicyEntry> policyEntries = new ArrayList Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ NetworkLoadTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.