alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (MessagePriorityTest.java)

This example ActiveMQ source code file (MessagePriorityTest.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqprefetchpolicy, activemqtopic, exception, exception, high_pri, low_pri, message, message, msg_num, msg_num, object, producerthread, producerthread, test

The ActiveMQ MessagePriorityTest.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.store;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract public class MessagePriorityTest extends CombinationTestSupport {
    
    private static final Logger LOG = LoggerFactory.getLogger(MessagePriorityTest.class);

    BrokerService broker;
    PersistenceAdapter adapter;
    
    protected ActiveMQConnectionFactory factory;
    protected Connection conn;
    protected Session sess;
    
    public boolean useCache = true;
    public boolean dispatchAsync = true;
    public boolean prioritizeMessages = true;
    public boolean immediatePriorityDispatch = true;
    public int prefetchVal = 500;

    public int MSG_NUM = 600;
    public int HIGH_PRI = 7;
    public int LOW_PRI = 3;
    
    abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
    
    protected void setUp() throws Exception {
        broker = new BrokerService();
        broker.setBrokerName("priorityTest");
        broker.setAdvisorySupport(false);
        adapter = createPersistenceAdapter(true);
        broker.setPersistenceAdapter(adapter);
        PolicyEntry policy = new PolicyEntry();
        policy.setPrioritizedMessages(prioritizeMessages);
        policy.setUseCache(useCache);
        StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
                new StorePendingDurableSubscriberMessageStoragePolicy();
        durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
        policy.setPendingDurableSubscriberPolicy(durableSubPending);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(new ActiveMQQueue("TEST"), policy);
        policyMap.put(new ActiveMQTopic("TEST"), policy);
        broker.setDestinationPolicy(policyMap);
        broker.start();
        broker.waitUntilStarted();
        
        factory = new ActiveMQConnectionFactory("vm://priorityTest");
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(prefetchVal);
        factory.setPrefetchPolicy(prefetch);
        factory.setWatchTopicAdvisories(false);
        factory.setDispatchAsync(dispatchAsync);
        conn = factory.createConnection();
        conn.setClientID("priority");
        conn.start();
        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }
    
    protected void tearDown() throws Exception {
        try {
            sess.close();
            conn.close();
        } catch (Exception ignored) {
        } finally {
            broker.stop();
            broker.waitUntilStopped();
        }
    }
    
    public void testStoreConfigured() throws Exception {
        Queue queue = sess.createQueue("TEST");
        Topic topic = sess.createTopic("TEST");
        
        MessageProducer queueProducer = sess.createProducer(queue);
        MessageProducer topicProducer = sess.createProducer(topic);
        
        
        Thread.sleep(500); // get it all propagated
        
        assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
        assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
        
        queueProducer.close();
        topicProducer.close();
        
    }
    
    protected class ProducerThread extends Thread {

        int priority;
        int messageCount;
        ActiveMQDestination dest;
        
        public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
            this.messageCount = messageCount;
            this.priority = priority;
            this.dest = dest;
        }
        
        public void run() {
            try {
                MessageProducer producer = sess.createProducer(dest);
                producer.setPriority(priority);
                for (int i = 0; i < messageCount; i++) {
                    producer.send(sess.createTextMessage("message priority: " + priority));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void setMessagePriority(int priority) {
            this.priority = priority;
        }

        public void setMessageCount(int messageCount) {
            this.messageCount = messageCount;    
        }
        
    }
    
    public void initCombosForTestQueues() {
        addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)});
    }
    
    public void testQueues() throws Exception {
        ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");

        ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI);
        ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI);
        
        lowPri.start();
        highPri.start();
        
        lowPri.join();
        highPri.join();
        
        MessageConsumer queueConsumer = sess.createConsumer(queue);
        for (int i = 0; i < MSG_NUM * 2; i++) {
            Message msg = queueConsumer.receive(5000);
            LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
            assertNotNull("Message " + i + " was null", msg);
            assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
        }
    }
    
    protected Message createMessage(int priority) throws Exception {
        final String text = "priority " + priority;
        Message msg = sess.createTextMessage(text);
        LOG.info("Sending  " + text);
        return msg;
    }

    public void initCombosForTestDurableSubs() {
        addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/4)});
    }
    
    public void testDurableSubs() throws Exception {
        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
        TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority");
        sub.close();
        
        ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
        ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
        
        lowPri.start();
        highPri.start();
        
        lowPri.join();
        highPri.join();
        
        sub = sess.createDurableSubscriber(topic, "priority");
        for (int i = 0; i < MSG_NUM * 2; i++) {
            Message msg = sub.receive(5000);
            assertNotNull("Message " + i + " was null", msg);
            assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
        }


        // verify that same broker/store can deal with non priority dest also
        topic = (ActiveMQTopic)sess.createTopic("HAS_NO_PRIORITY");
        sub = sess.createDurableSubscriber(topic, "no_priority");
        sub.close();

        lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
        highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);

        lowPri.start();
        highPri.start();

        lowPri.join();
        highPri.join();

        sub = sess.createDurableSubscriber(topic, "no_priority");
        // verify we got them all
        for (int i = 0; i < MSG_NUM * 2; i++) {
            Message msg = sub.receive(5000);
            assertNotNull("Message " + i + " was null", msg);
        }

    }

    public void initCombosForTestDurableSubsReconnect() {
        addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
        // REVISIT = is dispatchAsync = true a problem or is it just the test?
        addCombinationValues("dispatchAsync", new Object[] {Boolean.FALSE});
        addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE});
    }
    
    public void testDurableSubsReconnect() throws Exception {
        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
        final String subName = "priorityDisconnect";
        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
        sub.close();

        ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
        ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);

        lowPri.start();
        highPri.start();

        lowPri.join();
        highPri.join();


        final int closeFrequency = MSG_NUM/4;
        sub = sess.createDurableSubscriber(topic, subName);
        for (int i = 0; i < MSG_NUM * 2; i++) {
            Message msg = sub.receive(15000);
            LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
            assertNotNull("Message " + i + " was null", msg);
            assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
            if (i>0 && i%closeFrequency==0) {
                LOG.info("Closing durable sub.. on: " + i);
                sub.close();
                sub = sess.createDurableSubscriber(topic, subName);
            }
        }
    }

    public void testHighPriorityDelivery() throws Exception {

        // get zero prefetch
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(0);
        factory.setPrefetchPolicy(prefetch);
        conn.close();
        conn = factory.createConnection();
        conn.setClientID("priority");
        conn.start();
        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
        final String subName = "priorityDisconnect";
        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
        sub.close();

        final int numToProduce = 2000;
        final int[] dups = new int[numToProduce*2];
        ProducerThread producerThread = new ProducerThread(topic, numToProduce, LOW_PRI+1);
        producerThread.run();
        LOG.info("Low priority messages sent");

        sub = sess.createDurableSubscriber(topic, subName);
        final int batchSize = 250;
        int lowLowCount = 0;
        for (int i=0; i<numToProduce; i++) {
            Message msg = sub.receive(15000);
            LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + "-" + msg.getJMSPriority() : null));
            assertNotNull("Message " + i + " was null", msg);
            assertEquals("Message " + i + " has wrong priority", LOW_PRI+1, msg.getJMSPriority());
            assertTrue("not duplicate ", dups[i] == 0);
            dups[i] = 1;

            if (i % batchSize == 0) {
                producerThread.setMessagePriority(HIGH_PRI);
                producerThread.setMessageCount(1);
                producerThread.run();
                LOG.info("High priority message sent, should be able to receive immediately");

                if (i % batchSize*2 == 0) {
                    producerThread.setMessagePriority(HIGH_PRI -1);
                    producerThread.setMessageCount(1);
                    producerThread.run();
                    LOG.info("High -1 priority message sent, should be able to receive immediately");
                }

                if (i % batchSize*4 == 0) {
                    producerThread.setMessagePriority(LOW_PRI);
                    producerThread.setMessageCount(1);
                    producerThread.run();
                    lowLowCount++;
                    LOG.info("Low low priority message sent, should not be able to receive immediately");
                }

                msg = sub.receive(15000);
                assertNotNull("Message was null", msg);
                LOG.info("received hi? : " + msg);
                assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
                            
                if (i % batchSize*2 == 0) {
                    msg = sub.receive(15000);
                    assertNotNull("Message was null", msg);
                    LOG.info("received hi -1 ? i=" + i + ", " + msg);
                    assertEquals("high priority", HIGH_PRI -1, msg.getJMSPriority());
                }
            }
        }
        for (int i=0; i<lowLowCount; i++) {
            Message msg = sub.receive(15000);
            LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
            assertNotNull("Message " + i + " was null", msg);
            assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
        }
    }
    
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ MessagePriorityTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.