|
ActiveMQ example source code file (MBeanTest.java)
The ActiveMQ MBeanTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.broker.jmx; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.net.URL; import java.util.HashMap; import java.util.Map; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.MBeanServer; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; import junit.textui.TestRunner; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.util.JMXSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A test case of the various MBeans in ActiveMQ. If you want to look at the * various MBeans after the test has been run then run this test case as a * command line application. * * */ public class MBeanTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class); private static boolean waitForKeyPress; protected MBeanServer mbeanServer; protected String domain = "org.apache.activemq"; protected String clientID = "foo"; protected Connection connection; protected boolean transacted; protected int authMode = Session.AUTO_ACKNOWLEDGE; protected static final int MESSAGE_COUNT = 2*BaseDestination.MAX_PAGE_SIZE; /** * When you run this test case from the command line it will pause before * terminating so that you can look at the MBeans state for debugging * purposes. */ public static void main(String[] args) { waitForKeyPress = true; TestRunner.run(MBeanTest.class); } public void testConnectors() throws Exception{ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort()); } public void testMBeans() throws Exception { connection = connectionFactory.createConnection(); useConnection(connection); // test all the various MBeans now we have a producer, consumer and // messages on a queue assertSendViaMBean(); assertQueueBrowseWorks(); assertCreateAndDestroyDurableSubscriptions(); assertConsumerCounts(); } public void testMoveMessages() throws Exception { connection = connectionFactory.createConnection(); useConnection(connection); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); CompositeData[] compdatalist = queue.browse(); int initialQueueSize = compdatalist.length; if (initialQueueSize == 0) { fail("There is no message in the queue:"); } else { echo("Current queue size: " + initialQueueSize); } int messageCount = initialQueueSize; String[] messageIDs = new String[messageCount]; for (int i = 0; i < messageCount; i++) { CompositeData cdata = compdatalist[i]; String messageID = (String) cdata.get("JMSMessageID"); assertNotNull("Should have a message ID for message " + i, messageID); messageIDs[i] = messageID; } assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); echo("About to move " + messageCount + " messages"); String newDestination = getSecondDestinationString(); for (String messageID : messageIDs) { echo("Moving message: " + messageID); queue.moveMessageTo(messageID, newDestination); } echo("Now browsing the queue"); compdatalist = queue.browse(); int actualCount = compdatalist.length; echo("Current queue size: " + actualCount); assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount); echo("Now browsing the second queue"); queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); long newQueuesize = queueNew.getQueueSize(); echo("Second queue size: " + newQueuesize); assertEquals("Unexpected number of messages ",messageCount, newQueuesize); // check memory usage migration assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0); assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); assertTrue("use cache", queueNew.isUseCache()); assertTrue("cache enabled", queueNew.isCacheEnabled()); } public void testRemoveMessages() throws Exception { ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); broker.addQueue(getDestinationString()); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); String msg1 = queue.sendTextMessage("message 1"); String msg2 = queue.sendTextMessage("message 2"); assertTrue(queue.removeMessage(msg2)); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQDestination dest = createDestination(); MessageConsumer consumer = session.createConsumer(dest); Message message = consumer.receive(1000); assertNotNull(message); assertEquals(msg1, message.getJMSMessageID()); String msg3 = queue.sendTextMessage("message 3"); message = consumer.receive(1000); assertNotNull(message); assertEquals(msg3, message.getJMSMessageID()); message = consumer.receive(1000); assertNull(message); } public void testRetryMessages() throws Exception { // lets speed up redelivery ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory; factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0); factory.getRedeliveryPolicy().setMaximumRedeliveries(1); factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); factory.getRedeliveryPolicy().setUseCollisionAvoidance(false); factory.getRedeliveryPolicy().setUseExponentialBackOff(false); factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0); connection = connectionFactory.createConnection(); useConnection(connection); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); long initialQueueSize = queue.getQueueSize(); echo("current queue size: " + initialQueueSize); assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); // lets create a duff consumer which keeps rolling back... Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString())); Message message = consumer.receive(5000); while (message != null) { echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount")); session.rollback(); message = consumer.receive(2000); } consumer.close(); session.close(); // now lets get the dead letter queue Thread.sleep(1000); ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost"); QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); long initialDlqSize = dlq.getQueueSize(); CompositeData[] compdatalist = dlq.browse(); int dlqQueueSize = compdatalist.length; if (dlqQueueSize == 0) { fail("There are no messages in the queue:"); } else { echo("Current DLQ queue size: " + dlqQueueSize); } int messageCount = dlqQueueSize; String[] messageIDs = new String[messageCount]; for (int i = 0; i < messageCount; i++) { CompositeData cdata = compdatalist[i]; String messageID = (String) cdata.get("JMSMessageID"); assertNotNull("Should have a message ID for message " + i, messageID); messageIDs[i] = messageID; } int dlqMemUsage = dlq.getMemoryPercentUsage(); assertTrue("dlq has some memory usage", dlqMemUsage > 0); assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); echo("About to retry " + messageCount + " messages"); for (String messageID : messageIDs) { echo("Retrying message: " + messageID); dlq.retryMessage(messageID); } long queueSize = queue.getQueueSize(); compdatalist = queue.browse(); int actualCount = compdatalist.length; echo("Orginal queue size is now " + queueSize); echo("Original browse queue size: " + actualCount); long dlqSize = dlq.getQueueSize(); echo("DLQ size: " + dlqSize); assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize); assertEquals("queue size", initialQueueSize, queueSize); assertEquals("browse queue size", initialQueueSize, actualCount); assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); } public void testMoveMessagesBySelector() throws Exception { connection = connectionFactory.createConnection(); useConnection(connection); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); String newDestination = getSecondDestinationString(); queue.moveMatchingMessagesTo("counter > 2", newDestination); queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); int movedSize = MESSAGE_COUNT-3; assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); // now lets remove them by selector queue.removeMatchingMessages("counter > 2"); assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); } public void testCopyMessagesBySelector() throws Exception { connection = connectionFactory.createConnection(); useConnection(connection); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); String newDestination = getSecondDestinationString(); long queueSize = queue.getQueueSize(); queue.copyMatchingMessagesTo("counter > 2", newDestination); queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); // now lets remove them by selector queue.removeMatchingMessages("counter > 2"); assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); } protected void assertSendViaMBean() throws Exception { String queueName = getDestinationString() + ".SendMBBean"; ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); echo("Create QueueView MBean..."); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); broker.addQueue(queueName); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + queueName + ",BrokerName=localhost"); echo("Create QueueView MBean..."); QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); proxy.purge(); int count = 5; for (int i = 0; i < count; i++) { String body = "message:" + i; Map headers = new HashMap(); headers.put("JMSCorrelationID", "MyCorrId"); headers.put("JMSDeliveryMode", Boolean.FALSE); headers.put("JMSXGroupID", "MyGroupID"); headers.put("JMSXGroupSeq", 1234); headers.put("JMSPriority", i + 1); headers.put("JMSType", "MyType"); headers.put("MyHeader", i); headers.put("MyStringHeader", "StringHeader" + i); proxy.sendTextMessage(headers, body); } CompositeData[] compdatalist = proxy.browse(); if (compdatalist.length == 0) { fail("There is no message in the queue:"); } String[] messageIDs = new String[compdatalist.length]; for (int i = 0; i < compdatalist.length; i++) { CompositeData cdata = compdatalist[i]; if (i == 0) { echo("Columns: " + cdata.getCompositeType().keySet()); } assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId"); assertComplexData(i, cdata, "JMSPriority", i + 1); assertComplexData(i, cdata, "JMSType", "MyType"); assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId"); assertComplexData(i, cdata, "JMSDeliveryMode", "NON-PERSISTENT"); String expected = "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}"; // The order of the properties is different when using the ibm jdk. if (System.getProperty("java.vendor").equals("IBM Corporation")) { expected = "{MyHeader=" + i + ", MyStringHeader=StringHeader" + i + "}"; } assertComplexData(i, cdata, "PropertiesText", expected); Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); assertEquals("intProperties size()", 1, intProperties.size()); assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader")); Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES); assertEquals("stringProperties size()", 1, stringProperties.size()); assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader")); Map properties = CompositeDataHelper.getMessageUserProperties(cdata); assertEquals("properties size()", 2, properties.size()); assertEquals("properties.MyHeader", i, properties.get("MyHeader")); assertEquals("properties.MyHeader", "StringHeader" + i, properties.get("MyStringHeader")); assertComplexData(i, cdata, "JMSXGroupSeq", 1234); assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID"); assertComplexData(i, cdata, "Text", "message:" + i); } } protected void assertComplexData(int messageIndex, CompositeData cdata, String name, Object expected) { Object value = cdata.get(name); assertEquals("Message " + messageIndex + " CData field: " + name, expected, value); } protected void assertQueueBrowseWorks() throws Exception { Integer mbeancnt = mbeanServer.getMBeanCount(); echo("Mbean count :" + mbeancnt); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); echo("Create QueueView MBean..."); QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); long concount = proxy.getConsumerCount(); echo("Consumer Count :" + concount); long messcount = proxy.getQueueSize(); echo("current number of messages in the queue :" + messcount); // lets browse CompositeData[] compdatalist = proxy.browse(); if (compdatalist.length == 0) { fail("There is no message in the queue:"); } String[] messageIDs = new String[compdatalist.length]; for (int i = 0; i < compdatalist.length; i++) { CompositeData cdata = compdatalist[i]; if (i == 0) { echo("Columns: " + cdata.getCompositeType().keySet()); } messageIDs[i] = (String)cdata.get("JMSMessageID"); echo("message " + i + " : " + cdata.values()); } TabularData table = proxy.browseAsTable(); echo("Found tabular data: " + table); assertTrue("Table should not be empty!", table.size() > 0); assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); String messageID = messageIDs[0]; String newDestinationName = "queue://dummy.test.cheese"; echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName); proxy.copyMessageTo(messageID, newDestinationName); assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); messageID = messageIDs[1]; echo("Attempting to remove: " + messageID); proxy.removeMessage(messageID); assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize()); echo("Worked!"); } protected void assertCreateAndDestroyDurableSubscriptions() throws Exception { // lets create a new topic ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); echo("Create QueueView MBean..."); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); broker.addTopic(getDestinationString()); assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); String topicName = getDestinationString(); String selector = null; ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector); broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector); assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length); assertNotNull("Should have created an mbean name for the durable subscriber!", name1); LOG.info("Created durable subscriber with name: " + name1); // now lets try destroy it broker.destroyDurableSubscriber(clientID, "subscriber1"); assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length); } protected void assertConsumerCounts() throws Exception { ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); assertTrue("broker is not a slave", !broker.isSlave()); // create 2 topics broker.addTopic(getDestinationString() + "1"); broker.addTopic(getDestinationString() + "2"); ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination=" + getDestinationString() + "1"); ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination=" + getDestinationString() + "2"); TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true); TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true); assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); String topicName = getDestinationString(); String selector = null; // create 1 subscriber for each topic broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector); broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector); assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); // create 1 more subscriber for topic1 broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector); assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount()); assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); // destroy topic1 subscriber broker.destroyDurableSubscriber(clientID, "topic1.subscriber1"); assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); // destroy topic2 subscriber broker.destroyDurableSubscriber(clientID, "topic2.subscriber1"); assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); // destroy remaining topic1 subscriber broker.destroyDurableSubscriber(clientID, "topic1.subscriber2"); assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); } protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { ObjectName objectName = new ObjectName(name); if (mbeanServer.isRegistered(objectName)) { echo("Bean Registered: " + objectName); } else { fail("Could not find MBean!: " + objectName); } return objectName; } protected void setUp() throws Exception { bindAddress = "tcp://localhost:61616"; useTopic = false; super.setUp(); mbeanServer = broker.getManagementContext().getMBeanServer(); } protected void tearDown() throws Exception { if (waitForKeyPress) { // We are running from the command line so let folks browse the // mbeans... System.out.println(); System.out.println("Press enter to terminate the program."); System.out.println("In the meantime you can use your JMX console to view the current MBeans"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); reader.readLine(); } if (connection != null) { connection.close(); connection = null; } super.tearDown(); } protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); answer.setPersistent(false); answer.setDeleteAllMessagesOnStartup(true); answer.setUseJmx(true); // apply memory limit so that %usage is visible PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setMemoryLimit(1024*1024*4); policyMap.setDefaultEntry(defaultEntry); answer.setDestinationPolicy(policyMap); answer.addConnector(bindAddress); return answer; } protected void useConnection(Connection connection) throws Exception { connection.setClientID(clientID); connection.start(); Session session = connection.createSession(transacted, authMode); destination = createDestination(); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = session.createTextMessage("Message: " + i); message.setIntProperty("counter", i); message.setJMSCorrelationID("MyCorrelationID"); message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); message.setJMSType("MyType"); message.setJMSPriority(5); producer.send(message); } Thread.sleep(1000); } protected void useConnectionWithBlobMessage(Connection connection) throws Exception { connection.setClientID(clientID); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode); destination = createDestination(); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < MESSAGE_COUNT; i++) { BlobMessage message = session.createBlobMessage(new URL("http://foo.bar/test")); message.setIntProperty("counter", i); message.setJMSCorrelationID("MyCorrelationID"); message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); message.setJMSType("MyType"); message.setJMSPriority(5); producer.send(message); } Thread.sleep(1000); } protected void useConnectionWithByteMessage(Connection connection) throws Exception { connection.setClientID(clientID); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode); destination = createDestination(); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < MESSAGE_COUNT; i++) { BytesMessage message = session.createBytesMessage(); message.writeBytes(("Message: " + i).getBytes()); message.setIntProperty("counter", i); message.setJMSCorrelationID("MyCorrelationID"); message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); message.setJMSType("MyType"); message.setJMSPriority(5); producer.send(message); } Thread.sleep(1000); } protected void echo(String text) { LOG.info(text); } protected String getSecondDestinationString() { return "test.new.destination." + getClass() + "." + getName(); } public void testTempQueueJMXDelete() throws Exception { connection = connectionFactory.createConnection(); connection.setClientID(clientID); connection.start(); Session session = connection.createSession(transacted, authMode); ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue(); Thread.sleep(1000); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost"); // should not throw an exception mbeanServer.getObjectInstance(queueViewMBeanName); tQueue.delete(); Thread.sleep(1000); try { // should throw an exception mbeanServer.getObjectInstance(queueViewMBeanName); fail("should be deleted already!"); } catch (Exception e) { // expected! } } // Test for AMQ-3029 public void testBrowseBlobMessages() throws Exception { connection = connectionFactory.createConnection(); useConnectionWithBlobMessage(connection); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); CompositeData[] compdatalist = queue.browse(); int initialQueueSize = compdatalist.length; if (initialQueueSize == 0) { fail("There is no message in the queue:"); } else { echo("Current queue size: " + initialQueueSize); } int messageCount = initialQueueSize; String[] messageIDs = new String[messageCount]; for (int i = 0; i < messageCount; i++) { CompositeData cdata = compdatalist[i]; String messageID = (String) cdata.get("JMSMessageID"); assertNotNull("Should have a message ID for message " + i, messageID); messageIDs[i] = messageID; } assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); } public void testBrowseBytesMessages() throws Exception { connection = connectionFactory.createConnection(); useConnectionWithByteMessage(connection); ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); CompositeData[] compdatalist = queue.browse(); int initialQueueSize = compdatalist.length; if (initialQueueSize == 0) { fail("There is no message in the queue:"); } else { echo("Current queue size: " + initialQueueSize); } int messageCount = initialQueueSize; String[] messageIDs = new String[messageCount]; for (int i = 0; i < messageCount; i++) { CompositeData cdata = compdatalist[i]; String messageID = (String) cdata.get("JMSMessageID"); assertNotNull("Should have a message ID for message " + i, messageID); messageIDs[i] = messageID; Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW); assertNotNull("should be a preview", preview); assertTrue("not empty", preview.length > 0); } assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); // consume all the messages echo("Attempting to consume all bytes messages from: " + destination); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); for (int i=0; i<MESSAGE_COUNT; i++) { Message message = consumer.receive(5000); assertNotNull(message); assertTrue(message instanceof BytesMessage); } consumer.close(); session.close(); } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ MBeanTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.