alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (MBeanTest.java)

This example ActiveMQ source code file (MBeanTest.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

brokername=localhost, compositedata, durable, durable, exception, exception, io, management, message, net, network, objectname, objectname, queueviewmbean, should, string, string, type=queue,destination, util

The ActiveMQ MBeanTest.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.jmx;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;

import junit.textui.TestRunner;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A test case of the various MBeans in ActiveMQ. If you want to look at the
 * various MBeans after the test has been run then run this test case as a
 * command line application.
 * 
 * 
 */
public class MBeanTest extends EmbeddedBrokerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class);

    private static boolean waitForKeyPress;

    protected MBeanServer mbeanServer;
    protected String domain = "org.apache.activemq";
    protected String clientID = "foo";

    protected Connection connection;
    protected boolean transacted;
    protected int authMode = Session.AUTO_ACKNOWLEDGE;
    protected static final int MESSAGE_COUNT = 2*BaseDestination.MAX_PAGE_SIZE;

    /**
     * When you run this test case from the command line it will pause before
     * terminating so that you can look at the MBeans state for debugging
     * purposes.
     */
    public static void main(String[] args) {
        waitForKeyPress = true;
        TestRunner.run(MBeanTest.class);
    }
    
    public void testConnectors() throws Exception{
        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
        assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());

    }

    public void testMBeans() throws Exception {
        connection = connectionFactory.createConnection();
        useConnection(connection);

        // test all the various MBeans now we have a producer, consumer and
        // messages on a queue
        assertSendViaMBean();
        assertQueueBrowseWorks();
        assertCreateAndDestroyDurableSubscriptions();
        assertConsumerCounts();
    }

    public void testMoveMessages() throws Exception {
        connection = connectionFactory.createConnection();
        useConnection(connection);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        CompositeData[] compdatalist = queue.browse();
        int initialQueueSize = compdatalist.length;
        if (initialQueueSize == 0) {
            fail("There is no message in the queue:");
        }
        else {
            echo("Current queue size: " + initialQueueSize);
        }
        int messageCount = initialQueueSize;
        String[] messageIDs = new String[messageCount];
        for (int i = 0; i < messageCount; i++) {
            CompositeData cdata = compdatalist[i];
            String messageID = (String) cdata.get("JMSMessageID");
            assertNotNull("Should have a message ID for message " + i, messageID);
            messageIDs[i] = messageID;
        }

        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);

        echo("About to move " + messageCount + " messages");

        String newDestination = getSecondDestinationString();
        for (String messageID : messageIDs) {
            echo("Moving message: " + messageID);
            queue.moveMessageTo(messageID, newDestination);
        }

        echo("Now browsing the queue");
        compdatalist = queue.browse();
        int actualCount = compdatalist.length;
        echo("Current queue size: " + actualCount);
        assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);

        echo("Now browsing the second queue");

        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
        QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        long newQueuesize = queueNew.getQueueSize();
        echo("Second queue size: " + newQueuesize);
        assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
        
        // check memory usage migration
        assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
        assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
        assertTrue("use cache", queueNew.isUseCache());
        assertTrue("cache enabled", queueNew.isCacheEnabled());
    }

    public void testRemoveMessages() throws Exception {
        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
        broker.addQueue(getDestinationString());

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
        String msg1 = queue.sendTextMessage("message 1");
        String msg2 = queue.sendTextMessage("message 2");

        assertTrue(queue.removeMessage(msg2));

        connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        ActiveMQDestination dest = createDestination();

        MessageConsumer consumer = session.createConsumer(dest);
        Message message = consumer.receive(1000);
        assertNotNull(message);
        assertEquals(msg1, message.getJMSMessageID());

        String msg3 = queue.sendTextMessage("message 3");
        message = consumer.receive(1000);
        assertNotNull(message);
        assertEquals(msg3, message.getJMSMessageID());

        message = consumer.receive(1000);
        assertNull(message);

    }

    public void testRetryMessages() throws Exception {
        // lets speed up redelivery
        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
        factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
        factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
        factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
        factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
        factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
        factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);

        connection = connectionFactory.createConnection();
        useConnection(connection);


        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        long initialQueueSize = queue.getQueueSize();
        echo("current queue size: " + initialQueueSize);
        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);

        // lets create a duff consumer which keeps rolling back...
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
        Message message = consumer.receive(5000);
        while (message != null) {
            echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
            session.rollback();
            message = consumer.receive(2000);
        }
        consumer.close();
        session.close();


        // now lets get the dead letter queue
        Thread.sleep(1000);

        ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
        QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);

        long initialDlqSize = dlq.getQueueSize();
        CompositeData[] compdatalist = dlq.browse();
        int dlqQueueSize = compdatalist.length;
        if (dlqQueueSize == 0) {
            fail("There are no messages in the queue:");
        }
        else {
            echo("Current DLQ queue size: " + dlqQueueSize);
        }
        int messageCount = dlqQueueSize;
        String[] messageIDs = new String[messageCount];
        for (int i = 0; i < messageCount; i++) {
            CompositeData cdata = compdatalist[i];
            String messageID = (String) cdata.get("JMSMessageID");
            assertNotNull("Should have a message ID for message " + i, messageID);
            messageIDs[i] = messageID;
        }

        int dlqMemUsage = dlq.getMemoryPercentUsage();
        assertTrue("dlq has some memory usage", dlqMemUsage > 0);
        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
        

        echo("About to retry " + messageCount + " messages");

        for (String messageID : messageIDs) {
            echo("Retrying message: " + messageID);
            dlq.retryMessage(messageID);
        }

        long queueSize = queue.getQueueSize();
        compdatalist = queue.browse();
        int actualCount = compdatalist.length;
        echo("Orginal queue size is now " + queueSize);
        echo("Original browse queue size: " + actualCount);

        long dlqSize = dlq.getQueueSize();
        echo("DLQ size: " + dlqSize);

        assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
        assertEquals("queue size", initialQueueSize, queueSize);
        assertEquals("browse queue size", initialQueueSize, actualCount);
        
        assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
    }

    public void testMoveMessagesBySelector() throws Exception {
        connection = connectionFactory.createConnection();
        useConnection(connection);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        String newDestination = getSecondDestinationString();
        queue.moveMatchingMessagesTo("counter > 2", newDestination);

        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");

        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
        int movedSize = MESSAGE_COUNT-3;
        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());

        // now lets remove them by selector
        queue.removeMatchingMessages("counter > 2");

        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
    }

    public void testCopyMessagesBySelector() throws Exception {
        connection = connectionFactory.createConnection();
        useConnection(connection);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        String newDestination = getSecondDestinationString();
        long queueSize = queue.getQueueSize();
        queue.copyMatchingMessagesTo("counter > 2", newDestination);

        

        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");

        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
        assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
        // now lets remove them by selector
        queue.removeMatchingMessages("counter > 2");

        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
    }


    protected void assertSendViaMBean() throws Exception {
        String queueName = getDestinationString() + ".SendMBBean";

        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
        echo("Create QueueView MBean...");
        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
        broker.addQueue(queueName);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + queueName + ",BrokerName=localhost");

        echo("Create QueueView MBean...");
        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        proxy.purge();
        
        int count = 5;
        for (int i = 0; i < count; i++) {
            String body = "message:" + i;

            Map headers = new HashMap();
            headers.put("JMSCorrelationID", "MyCorrId");
            headers.put("JMSDeliveryMode", Boolean.FALSE);
            headers.put("JMSXGroupID", "MyGroupID");
            headers.put("JMSXGroupSeq", 1234);
            headers.put("JMSPriority", i + 1);
            headers.put("JMSType", "MyType");
            headers.put("MyHeader", i);
            headers.put("MyStringHeader", "StringHeader" + i);

            proxy.sendTextMessage(headers, body);
        }
        
        CompositeData[] compdatalist = proxy.browse();
        if (compdatalist.length == 0) {
            fail("There is no message in the queue:");
        }
        String[] messageIDs = new String[compdatalist.length];

        for (int i = 0; i < compdatalist.length; i++) {
            CompositeData cdata = compdatalist[i];

            if (i == 0) {
                echo("Columns: " + cdata.getCompositeType().keySet());
            }

            assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId");
            assertComplexData(i, cdata, "JMSPriority", i + 1);
            assertComplexData(i, cdata, "JMSType", "MyType");
            assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId");
            assertComplexData(i, cdata, "JMSDeliveryMode", "NON-PERSISTENT");
            String expected = "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}";
            // The order of the properties is different when using the ibm jdk.
            if (System.getProperty("java.vendor").equals("IBM Corporation")) {
                expected = "{MyHeader=" + i + ", MyStringHeader=StringHeader" + i + "}";
            }
            assertComplexData(i, cdata, "PropertiesText", expected);

            Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
            assertEquals("intProperties size()", 1, intProperties.size());
            assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader"));

            Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES);
            assertEquals("stringProperties size()", 1, stringProperties.size());
            assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader"));

            Map properties = CompositeDataHelper.getMessageUserProperties(cdata);
            assertEquals("properties size()", 2, properties.size());
            assertEquals("properties.MyHeader", i, properties.get("MyHeader"));
            assertEquals("properties.MyHeader", "StringHeader" + i, properties.get("MyStringHeader"));

            assertComplexData(i, cdata, "JMSXGroupSeq", 1234);
            assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID");
            assertComplexData(i, cdata, "Text", "message:" + i);

        }
    }

    protected void assertComplexData(int messageIndex, CompositeData cdata, String name, Object expected) {
        Object value = cdata.get(name);
        assertEquals("Message " + messageIndex + " CData field: " + name, expected, value);
    }


    protected void assertQueueBrowseWorks() throws Exception {
        Integer mbeancnt = mbeanServer.getMBeanCount();
        echo("Mbean count :" + mbeancnt);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        echo("Create QueueView MBean...");
        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        long concount = proxy.getConsumerCount();
        echo("Consumer Count :" + concount);
        long messcount = proxy.getQueueSize();
        echo("current number of messages in the queue :" + messcount);

        // lets browse
        CompositeData[] compdatalist = proxy.browse();
        if (compdatalist.length == 0) {
            fail("There is no message in the queue:");
        }
        String[] messageIDs = new String[compdatalist.length];

        for (int i = 0; i < compdatalist.length; i++) {
            CompositeData cdata = compdatalist[i];

            if (i == 0) {
                echo("Columns: " + cdata.getCompositeType().keySet());
            }
            messageIDs[i] = (String)cdata.get("JMSMessageID");
            echo("message " + i + " : " + cdata.values());
        }

        TabularData table = proxy.browseAsTable();
        echo("Found tabular data: " + table);
        assertTrue("Table should not be empty!", table.size() > 0);

        assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());

        String messageID = messageIDs[0];
        String newDestinationName = "queue://dummy.test.cheese";
        echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName);
        proxy.copyMessageTo(messageID, newDestinationName);

        assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());

        messageID = messageIDs[1];
        echo("Attempting to remove: " + messageID);
        proxy.removeMessage(messageID);

        assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize());

        echo("Worked!");
    }

    protected void assertCreateAndDestroyDurableSubscriptions() throws Exception {
        // lets create a new topic
        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
        echo("Create QueueView MBean...");
        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);

        broker.addTopic(getDestinationString());

        assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);

        String topicName = getDestinationString();
        String selector = null;
        ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector);
        broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
        assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length);

        assertNotNull("Should have created an mbean name for the durable subscriber!", name1);

        LOG.info("Created durable subscriber with name: " + name1);

        // now lets try destroy it
        broker.destroyDurableSubscriber(clientID, "subscriber1");
        assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length);
    }

    protected void assertConsumerCounts() throws Exception {
        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);

        assertTrue("broker is not a slave", !broker.isSlave());
        // create 2 topics
        broker.addTopic(getDestinationString() + "1");
        broker.addTopic(getDestinationString() + "2");

        ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination=" + getDestinationString() + "1");
        ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination=" + getDestinationString() + "2");
        TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
        TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);

        assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
        assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());

        String topicName = getDestinationString();
        String selector = null;

        // create 1 subscriber for each topic
        broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
        broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);

        assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
        assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());

        // create 1 more subscriber for topic1
        broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector);

        assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount());
        assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());

        // destroy topic1 subscriber
        broker.destroyDurableSubscriber(clientID, "topic1.subscriber1");

        assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
        assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());

        // destroy topic2 subscriber
        broker.destroyDurableSubscriber(clientID, "topic2.subscriber1");

        assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
        assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());

        // destroy remaining topic1 subscriber
        broker.destroyDurableSubscriber(clientID, "topic1.subscriber2");

        assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
        assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
    }

    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
        ObjectName objectName = new ObjectName(name);
        if (mbeanServer.isRegistered(objectName)) {
            echo("Bean Registered: " + objectName);
        } else {
            fail("Could not find MBean!: " + objectName);
        }
        return objectName;
    }

    protected void setUp() throws Exception {
        bindAddress = "tcp://localhost:61616";
        useTopic = false;
        super.setUp();
        mbeanServer = broker.getManagementContext().getMBeanServer();
    }

    protected void tearDown() throws Exception {
        if (waitForKeyPress) {
            // We are running from the command line so let folks browse the
            // mbeans...
            System.out.println();
            System.out.println("Press enter to terminate the program.");
            System.out.println("In the meantime you can use your JMX console to view the current MBeans");
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            reader.readLine();
        }

        if (connection != null) {
            connection.close();
            connection = null;
        }
        super.tearDown();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService answer = new BrokerService();
        answer.setPersistent(false);
        answer.setDeleteAllMessagesOnStartup(true);
        answer.setUseJmx(true);
       
        // apply memory limit so that %usage is visible
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setMemoryLimit(1024*1024*4);
        policyMap.setDefaultEntry(defaultEntry);
        answer.setDestinationPolicy(policyMap);
        
        answer.addConnector(bindAddress);
        return answer;
    }

    protected void useConnection(Connection connection) throws Exception {
        connection.setClientID(clientID);
        connection.start();
        Session session = connection.createSession(transacted, authMode);
        destination = createDestination();
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = session.createTextMessage("Message: " + i);
            message.setIntProperty("counter", i);
            message.setJMSCorrelationID("MyCorrelationID");
            message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
            message.setJMSType("MyType");
            message.setJMSPriority(5);
            producer.send(message);
        }
        Thread.sleep(1000);
    }

    
    protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
        connection.setClientID(clientID);
        connection.start();
        ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode);
        destination = createDestination();
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            BlobMessage message = session.createBlobMessage(new URL("http://foo.bar/test"));
            message.setIntProperty("counter", i);
            message.setJMSCorrelationID("MyCorrelationID");
            message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
            message.setJMSType("MyType");
            message.setJMSPriority(5);
            producer.send(message);
        }
        Thread.sleep(1000);
    }

    protected void useConnectionWithByteMessage(Connection connection) throws Exception {
        connection.setClientID(clientID);
        connection.start();
        ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode);
        destination = createDestination();
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            BytesMessage message = session.createBytesMessage();
            message.writeBytes(("Message: " + i).getBytes());
            message.setIntProperty("counter", i);
            message.setJMSCorrelationID("MyCorrelationID");
            message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
            message.setJMSType("MyType");
            message.setJMSPriority(5);
            producer.send(message);
        }
        Thread.sleep(1000);
    }

    protected void echo(String text) {
        LOG.info(text);
    }


    protected String getSecondDestinationString() {
        return "test.new.destination." + getClass() + "." + getName();
    }


    public void testTempQueueJMXDelete() throws Exception {
        connection = connectionFactory.createConnection();
        
        connection.setClientID(clientID);
        connection.start();
        Session session = connection.createSession(transacted, authMode);
        ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
        Thread.sleep(1000);
        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+  JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
        
        // should not throw an exception
        mbeanServer.getObjectInstance(queueViewMBeanName);

        tQueue.delete();
        Thread.sleep(1000);
        try {
            // should throw an exception
            mbeanServer.getObjectInstance(queueViewMBeanName);

            fail("should be deleted already!");
        } catch (Exception e) {
            // expected!
        }

    }

    // Test for AMQ-3029
    public void testBrowseBlobMessages() throws Exception {
        connection = connectionFactory.createConnection();
        useConnectionWithBlobMessage(connection);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        CompositeData[] compdatalist = queue.browse();
        int initialQueueSize = compdatalist.length;
        if (initialQueueSize == 0) {
            fail("There is no message in the queue:");
        }
        else {
            echo("Current queue size: " + initialQueueSize);
        }
        int messageCount = initialQueueSize;
        String[] messageIDs = new String[messageCount];
        for (int i = 0; i < messageCount; i++) {
            CompositeData cdata = compdatalist[i];
            String messageID = (String) cdata.get("JMSMessageID");
            assertNotNull("Should have a message ID for message " + i, messageID);
            
            messageIDs[i] = messageID;
        }

        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
    }

    public void testBrowseBytesMessages() throws Exception {
        connection = connectionFactory.createConnection();
        useConnectionWithByteMessage(connection);

        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");

        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);

        CompositeData[] compdatalist = queue.browse();
        int initialQueueSize = compdatalist.length;
        if (initialQueueSize == 0) {
            fail("There is no message in the queue:");
        }
        else {
            echo("Current queue size: " + initialQueueSize);
        }
        int messageCount = initialQueueSize;
        String[] messageIDs = new String[messageCount];
        for (int i = 0; i < messageCount; i++) {
            CompositeData cdata = compdatalist[i];
            String messageID = (String) cdata.get("JMSMessageID");
            assertNotNull("Should have a message ID for message " + i, messageID);
            messageIDs[i] = messageID;

            Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
            assertNotNull("should be a preview", preview);
            assertTrue("not empty", preview.length > 0);
        }

        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);

        // consume all the messages
        echo("Attempting to consume all bytes messages from: " + destination);
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(destination);
        for (int i=0; i<MESSAGE_COUNT; i++) {
            Message message = consumer.receive(5000);
            assertNotNull(message);
            assertTrue(message instanceof BytesMessage);
        }
        consumer.close();
        session.close();
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ MBeanTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.