alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (NetworkLoadTest.java)

This example ActiveMQ source code file (NetworkLoadTest.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqqueue, atomiclong, atomiclong, brokerservice, brokerservice, connection, connection, forwardingclient, jmsexception, jmsexception, net, network, policyentry, q, session, stringbuffer, util

The ActiveMQ NetworkLoadTest.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network;

import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This test case is used to load test store and forwarding between brokers.  It sets up
 * n brokers to which have a chain of queues which this test consumes and produces to. 
 * 
 * If the network bridges gets stuck at any point subsequent queues will not get messages.  This test 
 * samples the production and consumption stats every second and if the flow of messages
 * get stuck then this tast fails.  The test monitors the flow of messages for 1 min.
 *  
 * @author chirino
 */
public class NetworkLoadTest extends TestCase {

	private static final transient Logger LOG = LoggerFactory.getLogger(NetworkLoadTest.class);

	// How many times do we sample?
    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", ""+60*1/5)); 
    // Slower machines might need to make this bigger.
    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 5));
	protected static final int BROKER_COUNT = 4;
	protected static final int MESSAGE_SIZE = 2000;
        String groupId;
        
	class ForwardingClient {

		private final AtomicLong forwardCounter = new AtomicLong();
		private final Connection toConnection;
		private final Connection fromConnection;

		public ForwardingClient(int from, int to) throws JMSException {
			toConnection = createConnection(from);
			Session toSession = toConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			final MessageProducer producer = toSession.createProducer(new ActiveMQQueue("Q"+to));
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			producer.setDisableMessageID(true);

			fromConnection = createConnection(from);
			Session fromSession = fromConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			MessageConsumer consumer = fromSession.createConsumer(new ActiveMQQueue("Q"+from));
			
			consumer.setMessageListener(new MessageListener() {
					public void onMessage(Message msg) {
						try {
							producer.send(msg);
							forwardCounter.incrementAndGet();
						} catch (JMSException e) {
							// this is caused by the connection getting closed. 
						}
					}
			});
		}

		public void start() throws JMSException {
			toConnection.start();
			fromConnection.start();
		}
		
		public void stop() throws JMSException {
		        toConnection.stop();
			fromConnection.stop();
		}
		
		public void close() throws JMSException {
			toConnection.close();
			fromConnection.close();
		}
	}

	private BrokerService[] brokers;
	private ForwardingClient[] forwardingClients;

	
	protected void setUp() throws Exception {
	        groupId = "network-load-test-"+System.currentTimeMillis();
		brokers = new BrokerService[BROKER_COUNT];
		for (int i = 0; i < brokers.length; i++) {
		    LOG.info("Starting broker: "+i);
			brokers[i] = createBroker(i);
			brokers[i].start();
		}
		
		// Wait for the network connection to get setup.
		// The wait is exponential since every broker has to connect to every other broker.
		Thread.sleep(BROKER_COUNT*BROKER_COUNT*50);
		
		forwardingClients = new ForwardingClient[BROKER_COUNT-1];		
		for (int i = 0; i < forwardingClients.length; i++) {
		    LOG.info("Starting fowarding client "+i);
			forwardingClients[i] = new ForwardingClient(i, i+1);
			forwardingClients[i].start();
		}
	}

	protected void tearDown() throws Exception {
		for (int i = 0; i < forwardingClients.length; i++) {
		    LOG.info("Stoping fowarding client "+i);
			forwardingClients[i].close();
		}
		for (int i = 0; i < brokers.length; i++) {
		    LOG.info("Stoping broker "+i);
			brokers[i].stop();
		}
	}

	protected Connection createConnection(int brokerId) throws JMSException {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:"+(60000+brokerId));
        connectionFactory.setOptimizedMessageDispatch(true);
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setUseCompression(false);
        connectionFactory.setDispatchAsync(true);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setOptimizeAcknowledge(false);
        connectionFactory.setWatchTopicAdvisories(false);
        ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
        qPrefetchPolicy.setQueuePrefetch(100);
        qPrefetchPolicy.setTopicPrefetch(1000);
        connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
        connectionFactory.setAlwaysSyncSend(true);
		return connectionFactory.createConnection();
	}

	protected BrokerService createBroker(int brokerId) throws Exception {
		BrokerService broker = new BrokerService();
		broker.setBrokerName("broker-" + brokerId);
		broker.setPersistent(false);
		broker.setUseJmx(true);
		broker.getManagementContext().setCreateConnector(false);

		final SystemUsage memoryManager = new SystemUsage();
		memoryManager.getMemoryUsage().setLimit(1024 * 1024 * 50); // 50 MB
		broker.setSystemUsage(memoryManager);

		final List<PolicyEntry> policyEntries = new ArrayList();
		final PolicyEntry entry = new PolicyEntry();
		entry.setQueue(">");
		entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB
		entry.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
		entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
		policyEntries.add(entry);

		// This is to turn of the default behavior of storing topic messages for retroactive consumption
		final PolicyEntry topicPolicyEntry = new PolicyEntry();
		topicPolicyEntry.setTopic(">");
		final NoSubscriptionRecoveryPolicy noSubscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
		topicPolicyEntry.setSubscriptionRecoveryPolicy(noSubscriptionRecoveryPolicy);

		final PolicyMap policyMap = new PolicyMap();
		policyMap.setPolicyEntries(policyEntries);
		broker.setDestinationPolicy(policyMap);
		
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:"+(60000+brokerId)));
        
        transportConnector.setDiscoveryUri(new URI("multicast://default?group="+groupId));        
        broker.addConnector(transportConnector);
                        
        DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
        networkConnector.setUri(new URI("multicast://default?group="+groupId));
	    networkConnector.setBridgeTempDestinations(true);
	    networkConnector.setPrefetchSize(1);
	    broker.addNetworkConnector(networkConnector);
        
		return broker;
	}
	
	public void testRequestReply() throws Exception {

		final int to = 0; // Send to the first broker
		int from = brokers.length-1; // consume from the last broker..
				
	    LOG.info("Staring Final Consumer");

	    Connection fromConnection = createConnection(from);
		fromConnection.start();
		Session fromSession = fromConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		MessageConsumer consumer = fromSession.createConsumer(new ActiveMQQueue("Q"+from));
		
		final AtomicReference<ActiveMQTextMessage> lastMessageReceived = new AtomicReference();
		final AtomicLong producedMessages = new AtomicLong();
		final AtomicLong receivedMessages = new AtomicLong();
		final AtomicBoolean done = new AtomicBoolean();

		// Setup the consumer..
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message msg) {
				ActiveMQTextMessage m = (ActiveMQTextMessage) msg;
				ActiveMQTextMessage last = lastMessageReceived.get();
				if( last!=null ) {
					// Some order checking...
					if( last.getMessageId().getProducerSequenceId() > m.getMessageId().getProducerSequenceId() ) {
						System.out.println("Received an out of order message. Got "+m.getMessageId()+", expected something after "+last.getMessageId());
					}
				}
				lastMessageReceived.set(m);
				receivedMessages.incrementAndGet();
			}
		});

	    LOG.info("Staring Initial Producer");
		final Connection toConnection = createConnection(to);
		Thread producer = new Thread("Producer") {
			@Override
			public void run() {
				try {
					toConnection.start();
					Session toSession = toConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
					final MessageProducer producer = toSession.createProducer(new ActiveMQQueue("Q"+to));
					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
					producer.setDisableMessageID(true);

					for (int i = 0; !done.get(); i++) {
						TextMessage msg = toSession.createTextMessage(createMessageText(i));
						producer.send(msg);
						producedMessages.incrementAndGet();
					}
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			
		    private String createMessageText(int index) {
				StringBuffer buffer = new StringBuffer(MESSAGE_SIZE);
				buffer.append(index + " on " + new Date() + " ...");
				if (buffer.length() > MESSAGE_SIZE) {
					return buffer.substring(0, MESSAGE_SIZE);
				}
				for (int i = buffer.length(); i < MESSAGE_SIZE; i++) {
					buffer.append(' ');
				}

				return buffer.toString();
			}
		};
		producer.start();
	
		
		// Give the forwarding clients a chance to get going and fill the down
		// stream broker queues..
		Thread.sleep(BROKER_COUNT*200);
		
        for (int i = 0; i < SAMPLES; i++) {

            long start = System.currentTimeMillis();
            producedMessages.set(0);
            receivedMessages.set(0);
            for (int j = 0; j < forwardingClients.length; j++) {
    			forwardingClients[j].forwardCounter.set(0);
    		}

            Thread.sleep(SAMPLE_DURATION);

            long end = System.currentTimeMillis();
            long r = receivedMessages.get();
            long p = producedMessages.get();

            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
            
            StringBuffer fwdingmsg = new StringBuffer(500);
            fwdingmsg.append("  forwarding counters: ");
            for (int j = 0; j < forwardingClients.length; j++) {
            	if( j!= 0 ) {
            		fwdingmsg.append(", ");
            	}
                fwdingmsg.append(forwardingClients[j].forwardCounter.get());
    		}
            LOG.info(fwdingmsg.toString());

            // The test is just checking to make sure thaat the producer and consumer does not hang
            // due to the network hops take to route the message form the producer to the consumer.
            assertTrue("Recieved some messages since last sample", r>0);
            assertTrue("Produced some messages since last sample", p>0);
            
        }
        LOG.info("Sample done.");
        done.set(true);
        // Wait for the producer to finish.
        producer.join(1000*5);
        toConnection.close();
        fromConnection.close();
        
	}


}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ NetworkLoadTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.