alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (NetworkBrokerDetachTest.java)

This example ActiveMQ source code file (NetworkBrokerDetachTest.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqdestination, activemqtopic, brokerservice, brokerservice, connectionfactory, connectionfactory, exception, exception, got, management, mbeanserverconnection, messagelistener, net, network, objectname, string, string, threading, threads, util

The ActiveMQ NetworkBrokerDetachTest.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network;

import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull;

import java.io.File;
import java.net.MalformedURLException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class NetworkBrokerDetachTest {

	private final static String BROKER_NAME = "broker";
	private final static String REM_BROKER_NAME = "networkedBroker";
	private final static String DESTINATION_NAME = "testQ";
	private final static int    NUM_CONSUMERS = 1;
	
    protected static final Logger LOG = LoggerFactory.getLogger(NetworkBrokerDetachTest.class);
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final boolean dynamicOnly = false;
    
    protected BrokerService broker;
    protected BrokerService networkedBroker;

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName(BROKER_NAME);
        configureBroker(broker);
        broker.addConnector("tcp://localhost:61617");
        NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
        configureNetworkConnector(networkConnector);
        return broker;
    }

    protected BrokerService createNetworkedBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName(REM_BROKER_NAME);
        configureBroker(broker);
        broker.getManagementContext().setCreateConnector(false);
        broker.addConnector("tcp://localhost:62617");
        NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
        configureNetworkConnector(networkConnector);
        return broker;
    }
    
    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(false);
        networkConnector.setNetworkTTL(networkTTL);
        networkConnector.setDynamicOnly(dynamicOnly);
    }
    
    // variants for each store....
    protected void configureBroker(BrokerService broker) throws Exception {
        //KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
        //persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
        //broker.setPersistenceAdapter(persistenceAdapter);        
        
        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
        persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest"));
        broker.setPersistenceAdapter(persistenceAdapter);
        
        // default AMQ
    }
    
    @Before
    public void init() throws Exception {
        broker = createBroker();
        broker.setDeleteAllMessagesOnStartup(true);
        broker.start();
        
        networkedBroker = createNetworkedBroker();
        networkedBroker.setDeleteAllMessagesOnStartup(true);
        networkedBroker.start();
    }
    
    @After
    public void cleanup() throws Exception {
        networkedBroker.stop();
        networkedBroker.waitUntilStopped();
        
        broker.stop();
        broker.waitUntilStopped();
    }

    @Test
    public void testNetworkedBrokerDetach() throws Exception {
        LOG.info("Creating Consumer on the networked broker ...");
        // Create a consumer on the networked broker 
        ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
        Connection consConn = consFactory.createConnection();
        Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
        for(int i=0; i<NUM_CONSUMERS; i++) {
            consSession.createConsumer(destination);
        }
        
        assertTrue("got expected consumer count from mbean within time limit", 
                verifyConsumerCount(1, destination, broker));
        
        
        LOG.info("Stopping Consumer on the networked broker ...");
        // Closing the connection will also close the consumer 
        consConn.close();
        
        // We should have 0 consumer for the queue on the local broker
        assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, broker));
    }

    
    @Test
    public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
        
        final AtomicInteger count = new AtomicInteger(0);
        MessageListener counter = new MessageListener() {
            public void onMessage(Message message) {
                count.incrementAndGet();
            }
        };
        
        LOG.info("Creating durable consumer on each broker ...");
        ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter);
        registerDurableConsumer(broker, counter);
        
        assertTrue("got expected consumer count from local broker mbean within time limit",
                verifyConsumerCount(2, destination, broker));
        
        assertTrue("got expected consumer count from network broker mbean within time limit",
                verifyConsumerCount(2, destination, networkedBroker));
        
        sendMessageTo(destination, broker);
        
        assertTrue("Got one message on each", verifyMessageCount(2, count));
        
        LOG.info("Stopping brokerTwo...");
        networkedBroker.stop();
        networkedBroker.waitUntilStopped();           
        
        LOG.info("restarting  broker Two...");
        networkedBroker = createNetworkedBroker();
        networkedBroker.start();
   
        LOG.info("Recreating durable Consumer on the broker after restart...");
        registerDurableConsumer(networkedBroker, counter);
        
        // give advisories a chance to percolate
        TimeUnit.SECONDS.sleep(5);
        
        sendMessageTo(destination, broker);
        
        // expect similar after restart
        assertTrue("got expected consumer count from local broker mbean within time limit",
                verifyConsumerCount(2, destination, broker));
 
        // a durable sub is auto bridged on restart unless dynamicOnly=true
        assertTrue("got expected consumer count from network broker mbean within time limit",
                verifyConsumerCount(2, destination, networkedBroker));

        assertTrue("got no inactive subs on broker", verifyDurableConsumerCount(0, broker));
        assertTrue("got no inactive subs on other broker", verifyDurableConsumerCount(0, networkedBroker));

        assertTrue("Got two more messages after restart", verifyMessageCount(4, count));
        TimeUnit.SECONDS.sleep(1);
        assertTrue("still Got just two more messages", verifyMessageCount(4, count));
    }

    private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
        return Wait.waitFor(new Wait.Condition() {
            public boolean isSatisified() throws Exception {
                return i == count.get();
            }      
        });
    }

    private ActiveMQTopic registerDurableConsumer(
            BrokerService brokerService, MessageListener listener) throws Exception {
        ConnectionFactory factory = createConnectionFactory(brokerService);
        Connection connection = factory.createConnection();
        connection.setClientID("DurableOne");
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
        // unique to a broker
        TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
        sub.setMessageListener(listener);
        return destination;
    }

    private void sendMessageTo(ActiveMQTopic destination, BrokerService brokerService) throws Exception {
        ConnectionFactory factory = createConnectionFactory(brokerService);
        Connection conn = factory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createProducer(destination).send(session.createTextMessage("Hi"));
        conn.close();
    }
    
    protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
        
        String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        connectionFactory.setOptimizedMessageDispatch(true);
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setUseCompression(false);
        connectionFactory.setDispatchAsync(false);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setOptimizeAcknowledge(false);
        connectionFactory.setWatchTopicAdvisories(true);
        ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
        qPrefetchPolicy.setQueuePrefetch(100);
        qPrefetchPolicy.setTopicPrefetch(1000);
        connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
        connectionFactory.setAlwaysSyncSend(true);
        return connectionFactory;
    }
    
    // JMX Helper Methods 
    private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final BrokerService broker) throws Exception {
        return Wait.waitFor(new Wait.Condition() {
            public boolean isSatisified() throws Exception {
                boolean result = false;
                try {
                    // We should have 1 consumer for the queue on the local broker
                    Object consumers = broker.getManagementContext().getAttribute(getObjectName(broker.getBrokerName(), destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName()), "ConsumerCount");
                    if (consumers != null) {
                        LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + consumers);
                        if (expectedCount == ((Long)consumers).longValue()) {
                            result = true;
                        }
                    }
                } catch (Exception ignoreAndRetry) {
                }
                return result;
            }      
        });
    }
    
    
    private boolean verifyDurableConsumerCount(final long expectedCount, final BrokerService broker) throws Exception {
        return Wait.waitFor(new Wait.Condition() {
            public boolean isSatisified() throws Exception {
                boolean result = false;
                MBeanServerConnection mbsc = getMBeanServerConnection();
                if (mbsc != null) {
                    Set subs = broker.getManagementContext().queryNames(getObjectName(broker.getBrokerName(), "Subscription", "active=false,*"), null);
                    if (subs != null) {
                        LOG.info("inactive durable subs on " + broker + " : " + subs);
                        if (expectedCount == subs.size()) {
                            result = true;
                        }
                    }
                }
                return result;
            }      
        });
    }

    private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
        final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
        MBeanServerConnection mbsc = null;
        try {
            JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
            mbsc = jmxc.getMBeanServerConnection();
        } catch (Exception ignored) {
            LOG.warn("getMBeanServer ex: " + ignored);
        }
        // If port 1099 is in use when the Broker starts, starting the jmx
        // connector will fail.  So, if we have no mbsc to query, skip the
        // test.
        assumeNotNull(mbsc);
        return mbsc;
    }
    
    
    private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
      ObjectName beanName = new ObjectName(
        "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
      );
      
      return beanName;
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ NetworkBrokerDetachTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.