alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (DurableSubscriptionOfflineTest.java)

This example ActiveMQ source code file (DurableSubscriptionOfflineTest.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

connection, connection, exception, exception, id, listener, listener, message, messageconsumer, messageproducer, object, session, subsid, subsid, util

The ActiveMQ DurableSubscriptionOfflineTest.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.usecases;

import java.util.Vector;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.kahadb.journal.Journal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {

    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
    public boolean usePrioritySupport = Boolean.TRUE;
    public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private Vector<Exception> exceptions = new Vector();

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory("vm://" + getName(true));
    }

    @Override
    protected Connection createConnection() throws Exception {
        return createConnection("cliName");
    }

    protected Connection createConnection(String name) throws Exception {
        Connection con = super.createConnection();
        con.setClientID(name);
        con.start();
        return con;
    }

    public static Test suite() {
        return suite(DurableSubscriptionOfflineTest.class);
    }
    
    protected void setUp() throws Exception {
        exceptions.clear();
        topic = (ActiveMQTopic) createDestination();
        createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        destroyBroker();
    }

    private void createBroker() throws Exception {
        createBroker(true);
    }
    
    private void createBroker(boolean deleteAllMessages) throws Exception {
        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
        broker.setBrokerName(getName(true));
        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        broker.getManagementContext().setCreateConnector(false);

        if (usePrioritySupport) {
            PolicyEntry policy = new PolicyEntry();
            policy.setPrioritizedMessages(true);
            PolicyMap policyMap = new PolicyMap();
            policyMap.setDefaultEntry(policy);
            broker.setDestinationPolicy(policyMap);
        }
        
        setDefaultPersistenceAdapter(broker);
        if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
            // ensure it kicks in during tests
            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
        } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
            // have lots of journal files
            ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
        }
        broker.start();
    }

    private void destroyBroker() throws Exception {
        if (broker != null)
            broker.stop();
    }

    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
        this.addCombinationValues("usePrioritySupport",
                new Object[]{ Boolean.TRUE, Boolean.FALSE});
    }

    public void testConsumeOnlyMatchedMessages() throws Exception {
        // create durable subscription
        Connection con = createConnection();
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            boolean filter = i % 2 == 1;
            if (filter)
                sent++;

            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send(topic, message);
        }

        session.close();
        con.close();

        // consume messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();

        assertEquals(sent, listener.count);
    }

     public void testConsumeAllMatchedMessages() throws Exception {
         // create durable subscription
         Connection con = createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
         session.close();
         con.close();

         // send messages
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(null);

         int sent = 0;
         for (int i = 0; i < 10; i++) {
             sent++;
             Message message = session.createMessage();
             message.setStringProperty("filter", "true");
             producer.send(topic, message);
         }

         Thread.sleep(1 * 1000);

         session.close();
         con.close();

         // consume messages
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
         Listener listener = new Listener();
         consumer.setMessageListener(listener);

         Thread.sleep(3 * 1000);

         session.close();
         con.close();

         assertEquals(sent, listener.count);
     }

    
    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
        this.addCombinationValues("usePrioritySupport",
                new Object[]{ Boolean.TRUE, Boolean.FALSE});
    }

     public void testVerifyAllConsumedAreAcked() throws Exception {
         // create durable subscription
         Connection con = createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
         session.close();
         con.close();

         // send messages
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(null);

         int sent = 0;
         for (int i = 0; i < 10; i++) {
             sent++;
             Message message = session.createMessage();
             message.setStringProperty("filter", "true");
             producer.send(topic, message);
         }

         Thread.sleep(1 * 1000);

         session.close();
         con.close();

         // consume messages
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
         Listener listener = new Listener();
         consumer.setMessageListener(listener);

         Thread.sleep(3 * 1000);

         session.close();
         con.close();

         LOG.info("Consumed: " + listener.count);
         assertEquals(sent, listener.count);

         // consume messages again, should not get any
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
         listener = new Listener();
         consumer.setMessageListener(listener);

         Thread.sleep(3 * 1000);

         session.close();
         con.close();

         assertEquals(0, listener.count);
     }

    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
        // create durable subscription 1
        Connection con = createConnection("cliId1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // create durable subscription 2
        Connection con2 = createConnection("cliId2");
        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener(listener2);

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send(topic, message);
        }

        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        // test online subs
        Thread.sleep(3 * 1000);
        session2.close();
        con2.close();

        assertEquals(sent, listener2.count);

        // consume messages
        con = createConnection("cliId1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();

        assertEquals("offline consumer got all", sent, listener.count);
    }

    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
        this.addCombinationValues("usePrioritySupport",
                new Object[]{ Boolean.TRUE, Boolean.FALSE});
    }

    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
        Connection con = createConnection("offCli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        con = createConnection("offCli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        Connection con2 = createConnection("onlineCli1");
        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener(listener2);

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send(topic, message);
        }

        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        // test online subs
        Thread.sleep(3 * 1000);
        session2.close();
        con2.close();
        assertEquals(sent, listener2.count);

        // restart broker
        broker.stop();
        createBroker(false /*deleteAllMessages*/);

        // test offline
        con = createConnection("offCli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);

        Connection con3 = createConnection("offCli2");
        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);

        Listener listener = new Listener();
        consumer.setMessageListener(listener);
        Listener listener3 = new Listener();
        consumer3.setMessageListener(listener3);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();
        session3.close();
        con3.close();

        assertEquals(sent, listener.count);
        assertEquals(sent, listener3.count);
    }


    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
    }

    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
        // create durable subscription 1
        Connection con = createConnection("cliId1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send(topic, message);
        }

        Thread.sleep(1 * 1000);

        // create durable subscription 2
        Connection con2 = createConnection("cliId2");
        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener(listener2);

        assertEquals(0, listener2.count);
        session2.close();
        con2.close();

        // send some more
        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send(topic, message);
        }

        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        con2 = createConnection("cliId2");
        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        listener2 = new Listener("cliId2");
        consumer2.setMessageListener(listener2);
        // test online subs
        Thread.sleep(3 * 1000);

        assertEquals(10, listener2.count);

        // consume all messages
        con = createConnection("cliId1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener("cliId1");
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();

        assertEquals("offline consumer got all", sent, listener.count);
    }    

    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
    }

    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
        // create offline subs 1
        Connection con = createConnection("offCli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", filter, true);
        session.close();
        con.close();

        // create offline subs 2
        con = createConnection("offCli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", filter, true);
        session.close();
        con.close();

        // create online subs
        Connection con2 = createConnection("onlineCli1");
        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener(listener2);

        // create non-durable consumer
        Connection con4 = createConnection("nondurableCli");
        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
        Listener listener4 = new Listener();
        consumer4.setMessageListener(listener4);

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        boolean hasRelevant = false;
        int filtered = 0;
        for (int i = 0; i < 100; i++) {
            int postf = (int) (Math.random() * 9) + 1;
            String d = "D" + postf;

            if ("D1".equals(d) || "D2".equals(d)) {
                hasRelevant = true;
                filtered++;
            }

            Message message = session.createMessage();
            message.setStringProperty("$a", "A1");
            message.setStringProperty("$d", d);
            producer.send(topic, message);
        }

        Message message = session.createMessage();
        message.setStringProperty("$a", "A1");
        message.setBooleanProperty("$b", true);
        message.setBooleanProperty("$c", hasRelevant);
        producer.send(topic, message);

        if (hasRelevant)
            filtered++;

        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        Thread.sleep(3 * 1000);

        // test non-durable consumer
        session4.close();
        con4.close();
        assertEquals(filtered, listener4.count); // succeeded!

        // test online subs
        session2.close();
        con2.close();
        assertEquals(filtered, listener2.count); // succeeded!

        // test offline 1
        con = createConnection("offCli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        Listener listener = new FilterCheckListener();
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);
        session.close();
        con.close();

        assertEquals(filtered, listener.count);

        // test offline 2
        Connection con3 = createConnection("offCli2");
        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
        Listener listener3 = new FilterCheckListener();
        consumer3.setMessageListener(listener3);

        Thread.sleep(3 * 1000);
        session3.close();
        con3.close();

        assertEquals(filtered, listener3.count);
        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
    }

    public void testRemovedDurableSubDeletes() throws Exception {
        // create durable subscription 1
        Connection con = createConnection("cliId1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send(topic, message);
        }

        Thread.sleep(1 * 1000);

        Connection con2 = createConnection("cliId1");
        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session2.unsubscribe("SubsId");
        session2.close();
        con2.close();

        // see if retroactive can consumer any
        topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
        con = createConnection("offCli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);
        session.close();
        con.close();
        assertEquals(0, listener.count);
    }

    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
    }
    
    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
        // create offline subs 1
        Connection con = createConnection("offCli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // create offline subs 2
        con = createConnection("offCli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int filtered = 0;
        for (int i = 0; i < 10; i++) {
            boolean filter = (int) (Math.random() * 2) >= 1;
            if (filter)
                filtered++;

            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send(topic, message);
        }

        LOG.info("sent: " + filtered);
        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        // restart broker
        Thread.sleep(3 * 1000);
        broker.stop();
        createBroker(false /*deleteAllMessages*/);
 
        // send more messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);

        for (int i = 0; i < 10; i++) {
            boolean filter = (int) (Math.random() * 2) >= 1;
            if (filter)
                filtered++;

            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send(topic, message);
        }

        LOG.info("after restart, sent: " + filtered);
        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        // test offline subs
        con = createConnection("offCli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);

        Connection con3 = createConnection("offCli2");
        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener3 = new Listener();
        consumer3.setMessageListener(listener3);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();
        session3.close();
        con3.close();

        assertEquals(filtered, listener.count);
        assertEquals(filtered, listener3.count);
    }

    public void initCombosForTestOfflineAfterRestart() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter",
                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
    }

    public void testOfflineSubscriptionAfterRestart() throws Exception {
        // create offline subs 1
        Connection con = createConnection("offCli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);

        // send messages
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "false");
            producer.send(topic, message);
        }

        LOG.info("sent: " + sent);
        Thread.sleep(5 * 1000);
        session.close();
        con.close();

        assertEquals(sent, listener.count);

        // restart broker
        Thread.sleep(3 * 1000);
        broker.stop();
        createBroker(false /*deleteAllMessages*/);

        // send more messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);

        for (int i = 0; i < 10; i++) {
            sent++;
            Message message = session.createMessage();
            message.setStringProperty("filter", "false");
            producer.send(topic, message);
        }

        LOG.info("after restart, sent: " + sent);
        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        // test offline subs
        con = createConnection("offCli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();

        assertEquals(sent, listener.count);
    }

    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
        // create offline subs 1
        Connection con = createConnection("offCli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // create offline subs 2
        con = createConnection("offCli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", null, true);
        session.close();
        con.close();


        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            boolean filter = (int) (Math.random() * 2) >= 1;

            sent++;

            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send(topic, message);
        }

        Thread.sleep(1 * 1000);

        Connection con2 = createConnection("offCli1");
        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session2.unsubscribe("SubsId");
        session2.close();
        con2.close();

        // consume all messages
        con = createConnection("offCli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
        Listener listener = new Listener("SubsId");
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();

        assertEquals("offline consumer got all", sent, listener.count);
    }


    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
        // create offline subs 1
        Connection con = createConnection("offCli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int filtered = 0;
        for (int i = 0; i < 10; i++) {
            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
            if (filter)
                filtered++;

            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send(topic, message);
        }

        LOG.info("sent: " + filtered);
        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        // test offline subs
        con = createConnection("offCli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.unsubscribe("SubsId");
        session.close();
        con.close();


        con = createConnection("offCli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);

        Thread.sleep(3 * 1000);

        session.close();
        con.close();

        assertEquals(0, listener.count);
    }


    public void testAllConsumed() throws Exception {
        final String filter = "filter = 'true'";
        Connection con = createConnection("cli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", filter, true);
        session.close();
        con.close();

        con = createConnection("cli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", filter, true);
        session.close();
        con.close();

        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        for (int i = 0; i < 10; i++) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send(topic, message);
            sent++;
        }

        LOG.info("sent: " + sent);
        Thread.sleep(1 * 1000);
        session.close();
        con.close();

        con = createConnection("cli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        Listener listener = new Listener();
        consumer.setMessageListener(listener);
        Thread.sleep(3 * 1000);
        session.close();
        con.close();

        assertEquals(sent, listener.count);

        LOG.info("cli2 pull 2");
        con = createConnection("cli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        assertNotNull("got message", consumer.receive(2000));
        assertNotNull("got message", consumer.receive(2000));
        session.close();
        con.close();


        // send messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);

        sent = 0;
        for (int i = 0; i < 2; i++) {
            Message message = session.createMessage();
            message.setStringProperty("filter", i==1 ? "true" : "false");
            producer.send(topic, message);
            sent++;
        }
        LOG.info("sent: " + sent);
        Thread.sleep(1 * 1000);
        session.close();
        con.close();
 
        LOG.info("cli1 again, should get 1 new ones");
        con = createConnection("cli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        listener = new Listener();
        consumer.setMessageListener(listener);
        Thread.sleep(3 * 1000);
        session.close();
        con.close();

        assertEquals(1, listener.count);
    }

    // https://issues.apache.org/jira/browse/AMQ-3190
    public void testNoMissOnMatchingSubAfterRestart() throws Exception {

        final String filter = "filter = 'true'";
        Connection con = createConnection("cli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", filter, true);
        session.close();
        con.close();

        // send unmatched messages
        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        int sent = 0;
        // message for cli1 to keep it interested
        Message message = session.createMessage();
        message.setStringProperty("filter", "true");
        message.setIntProperty("ID", 0);
        producer.send(topic, message);
        sent++;

        for (int i = sent; i < 10; i++) {
            message = session.createMessage();
            message.setStringProperty("filter", "false");
            message.setIntProperty("ID", i);
            producer.send(topic, message);
            sent++;
        }
        con.close();
        LOG.info("sent: " + sent);


        // new sub at id 10
        con = createConnection("cli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", filter, true);
        session.close();
        con.close();

        destroyBroker();
        createBroker(false);

        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);

        for (int i = sent; i < 30; i++) {
            message = session.createMessage();
            message.setStringProperty("filter", "true");
            message.setIntProperty("ID", i);
            producer.send(topic, message);
            sent++;
        }
        con.close();
        LOG.info("sent: " + sent);

        // pick up the first of the next twenty messages
        con = createConnection("cli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        Message m = consumer.receive(3000);
        assertEquals("is message 10", 10, m.getIntProperty("ID"));

        session.close();
        con.close();

        // pick up the first few messages for client1
        con = createConnection("cli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
        m = consumer.receive(3000);
        assertEquals("is message 0", 0, m.getIntProperty("ID"));
        m = consumer.receive(3000);
        assertEquals("is message 10", 10, m.getIntProperty("ID"));

        session.close();
        con.close();
    }

    // use very small journal to get lots of files to cleanup
    public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
        this.addCombinationValues("journalMaxFileLength",
                new Object[]{new Integer(64*1024)});
    }

    // https://issues.apache.org/jira/browse/AMQ-3206
    public void testCleanupDeletedSubAfterRestart() throws Exception {
        Connection con = createConnection("cli1");
        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", null, true);
        session.close();
        con.close();

        con = createConnection("cli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createDurableSubscriber(topic, "SubsId", null, true);
        session.close();
        con.close();

        con = createConnection();
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);

        final int toSend = 500;
        final String payload = new byte[40*1024].toString();
        int sent = 0;
        for (int i = sent; i < toSend; i++) {
            Message message = session.createTextMessage(payload);
            message.setStringProperty("filter", "false");
            message.setIntProperty("ID", i);
            producer.send(topic, message);
            sent++;
        }
        con.close();
        LOG.info("sent: " + sent);

        // kill off cli1
        con = createConnection("cli1");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.unsubscribe("SubsId");

        destroyBroker();
        createBroker(false);

        con = createConnection("cli2");
        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
        final Listener listener = new Listener();
        consumer.setMessageListener(listener);
        assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
            public boolean isSatisified() throws Exception {
                LOG.info("Want: " + toSend  + ", current: " + listener.count);
                return listener.count == toSend;
            }
        }));
        session.close();
        con.close();

        destroyBroker();
        createBroker(false);
        KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
        assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
    }

    public static class Listener implements MessageListener {
        int count = 0;
        String id = null;

        Listener() {
        }
        Listener(String id) {
            this.id = id;
        }
        public void onMessage(Message message) {
            count++;
            if (id != null) {
                try {
                    LOG.info(id + ", " + message.getJMSMessageID());
                } catch (Exception ignored) {}
            }
        }
    }

    public class FilterCheckListener extends Listener  {

        public void onMessage(Message message) {
            count++;

            try {
                Object b = message.getObjectProperty("$b");
                if (b != null) {
                    boolean c = message.getBooleanProperty("$c");
                    assertTrue("", c);
                }
                else {
                    String d = message.getStringProperty("$d");
                    assertTrue("", "D1".equals(d) || "D2".equals(d));
                }
            }
            catch (JMSException e) {
                exceptions.add(e);
            }
        }
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ DurableSubscriptionOfflineTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.