|
ActiveMQ example source code file (TwoBrokerQueueClientsReconnectTest.java)
The ActiveMQ TwoBrokerQueueClientsReconnectTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.usecases; import java.net.URI; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport { protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100 protected static final int PREFETCH_COUNT = 1; private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueClientsReconnectTest.class); protected int msgsClient1; protected int msgsClient2; protected String broker1; protected String broker2; public void testClientAReceivesOnly() throws Exception { broker1 = "BrokerA"; broker2 = "BrokerB"; doOneClientReceivesOnly(); } public void testClientBReceivesOnly() throws Exception { broker1 = "BrokerB"; broker2 = "BrokerA"; doOneClientReceivesOnly(); } public void doOneClientReceivesOnly() throws Exception { // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); // Run brokers startAllBrokers(); // Create queue Destination dest = createDestination("TEST.FOO", false); // Create consumers MessageConsumer client1 = createConsumer(broker1, dest); MessageConsumer client2 = createConsumer(broker2, dest); // Give clients time to register with broker Thread.sleep(500); // Always send messages to broker A sendMessages("BrokerA", dest, MESSAGE_COUNT); // Close the second client, messages should be sent to the first client client2.close(); // Let the first client receive all messages msgsClient1 += receiveAllMessages(client1); client1.close(); // First client should have received 100 messages assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT, msgsClient1); } public void testClientAReceivesOnlyAfterReconnect() throws Exception { broker1 = "BrokerA"; broker2 = "BrokerB"; doOneClientReceivesOnlyAfterReconnect(); } public void testClientBReceivesOnlyAfterReconnect() throws Exception { broker1 = "BrokerB"; broker2 = "BrokerA"; doOneClientReceivesOnlyAfterReconnect(); } public void doOneClientReceivesOnlyAfterReconnect() throws Exception { // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); // Run brokers startAllBrokers(); // Create queue Destination dest = createDestination("TEST.FOO", false); // Create first consumer MessageConsumer client1 = createConsumer(broker1, dest); MessageConsumer client2 = createConsumer(broker2, dest); // Give clients time to register with broker Thread.sleep(500); // Always send message to broker A sendMessages("BrokerA", dest, MESSAGE_COUNT); // Let the first client receive the first 20% of messages msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); // Disconnect the first client client1.close(); // Create another client for the first broker client1 = createConsumer(broker1, dest); Thread.sleep(500); // Close the second client, messages should be sent to the first client client2.close(); // Receive the rest of the messages msgsClient1 += receiveAllMessages(client1); client1.close(); // The first client should have received 100 messages assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT, msgsClient1); } public void testTwoClientsReceiveClientADisconnects() throws Exception { broker1 = "BrokerA"; broker2 = "BrokerB"; doTwoClientsReceiveOneClientDisconnects(); } public void testTwoClientsReceiveClientBDisconnects() throws Exception { broker1 = "BrokerB"; broker2 = "BrokerA"; doTwoClientsReceiveOneClientDisconnects(); } public void doTwoClientsReceiveOneClientDisconnects() throws Exception { // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); // Run brokers startAllBrokers(); // Create queue Destination dest = createDestination("TEST.FOO", false); // Create first client MessageConsumer client1 = createConsumer(broker1, dest); MessageConsumer client2 = createConsumer(broker2, dest); // Give clients time to register with broker Thread.sleep(500); // Always send messages to broker A sendMessages("BrokerA", dest, MESSAGE_COUNT); // Let each client receive 20% of the messages - 40% total msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); // Disconnect the first client client1.close(); // Let the second client receive the rest of the messages msgsClient2 += receiveAllMessages(client2); client2.close(); // First client should have received 20% of the messages assertEquals("Client for " + broker1 + " should have received 20% of the messages.", (int)(MESSAGE_COUNT * 0.20), msgsClient1); // Second client should have received 80% of the messages assertEquals("Client for " + broker2 + " should have received 80% of the messages.", (int)(MESSAGE_COUNT * 0.80), msgsClient2); } public void testTwoClientsReceiveClientAReconnects() throws Exception { broker1 = "BrokerA"; broker2 = "BrokerB"; doTwoClientsReceiveOneClientReconnects(); } public void testTwoClientsReceiveClientBReconnects() throws Exception { broker1 = "BrokerB"; broker2 = "BrokerA"; doTwoClientsReceiveOneClientReconnects(); } public void doTwoClientsReceiveOneClientReconnects() throws Exception { // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); // Run brokers startAllBrokers(); // Create queue Destination dest = createDestination("TEST.FOO", false); // Create the first client MessageConsumer client1 = createConsumer(broker1, dest); MessageConsumer client2 = createConsumer(broker2, dest); // Give clients time to register with broker Thread.sleep(500); // Always send messages to broker A sendMessages("BrokerA", dest, MESSAGE_COUNT); // Let each client receive 20% of the messages - 40% total msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); // Disconnect the first client client1.close(); // Let the second client receive 20% more of the total messages msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); // Create another client for broker 1 client1 = createConsumer(broker1, dest); Thread.sleep(500); // Let each client receive 20% of the messages - 40% total msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); client1.close(); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); client2.close(); // First client should have received 40 messages assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1); // Second client should have received 60 messages assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2); } public void testTwoClientsReceiveTwoClientReconnects() throws Exception { broker1 = "BrokerA"; broker2 = "BrokerB"; // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); // Run brokers startAllBrokers(); // Create queue Destination dest = createDestination("TEST.FOO", false); // Create the first client MessageConsumer client1 = createConsumer(broker1, dest); MessageConsumer client2 = createConsumer(broker2, dest); // Give clients time to register with broker Thread.sleep(500); // Always send messages to broker A sendMessages("BrokerA", dest, MESSAGE_COUNT); // Let each client receive 20% of the messages - 40% total msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); // Disconnect both clients client1.close(); client2.close(); // Create another two clients for each broker client1 = createConsumer(broker1, dest); client2 = createConsumer(broker2, dest); Thread.sleep(500); // Let each client receive 30% more of the total messages - 60% total msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30)); client1.close(); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30)); client2.close(); // First client should have received 50% of the messages assertEquals("Client for " + broker1 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient1); // Second client should have received 50% of the messages assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2); } protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception { Message msg; int i; for (i = 0; i < msgCount; i++) { msg = consumer.receive(1000); if (msg == null) { LOG.error("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i); break; } } return i; } protected int receiveAllMessages(MessageConsumer consumer) throws Exception { int msgsReceived = 0; Message msg; do { msg = consumer.receive(1000); if (msg != null) { msgsReceived++; } } while (msg != null); return msgsReceived; } protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception { Connection conn = createConnection(brokerName); conn.start(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); return sess.createConsumer(dest); } public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); // Configure broker connection factory ActiveMQConnectionFactory factoryA; ActiveMQConnectionFactory factoryB; factoryA = (ActiveMQConnectionFactory)getConnectionFactory("BrokerA"); factoryB = (ActiveMQConnectionFactory)getConnectionFactory("BrokerB"); // Set prefetch policy ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); policy.setAll(PREFETCH_COUNT); factoryA.setPrefetchPolicy(policy); factoryB.setPrefetchPolicy(policy); msgsClient1 = 0; msgsClient2 = 0; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ TwoBrokerQueueClientsReconnectTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.