alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (DurableSubscriptionTestSupport.java)

This example ActiveMQ source code file (DurableSubscriptionTestSupport.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

exception, exception, message, messageproducer, msg:1, msg:1, msg:2, msg:2, msg:3, session, testtopic, textmessage, topic, topic

The ActiveMQ DurableSubscriptionTestSupport.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;

/**
 * 
 */
public abstract class DurableSubscriptionTestSupport extends TestSupport {

    private Connection connection;
    private Session session;
    private TopicSubscriber consumer;
    private MessageProducer producer;
    private BrokerService broker;

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory("vm://durable-broker");
    }

    protected Connection createConnection() throws Exception {
        Connection rc = super.createConnection();
        rc.setClientID(getName());
        return rc;
    }

    protected void setUp() throws Exception {
        createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        destroyBroker();
    }

    protected void restartBroker() throws Exception {
        destroyBroker();
        createRestartedBroker(); // retain stored messages
    }

    private void createBroker() throws Exception {
        broker = new BrokerService();
        broker.setBrokerName("durable-broker");
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setPersistenceAdapter(createPersistenceAdapter());
        broker.setPersistent(true);
        broker.start();

        connection = createConnection();
    }

    private void createRestartedBroker() throws Exception {
        broker = new BrokerService();
        broker.setBrokerName("durable-broker");
        broker.setDeleteAllMessagesOnStartup(false);
        broker.setPersistenceAdapter(createPersistenceAdapter());
        broker.setPersistent(true);
        broker.start();

        connection = createConnection();
    }

    private void destroyBroker() throws Exception {
        if (connection != null) {
            connection.close();
        }
        if (broker != null) {
            broker.stop();
        }
    }

    protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
    
    public void testMessageExpire() throws Exception {
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1");
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.setTimeToLive(1000);
        connection.start();

        // Make sure it works when the durable sub is active.
        producer.send(session.createTextMessage("Msg:1"));
        assertTextMessageEquals("Msg:1", consumer.receive(1000));
        
        consumer.close();
        
        producer.send(session.createTextMessage("Msg:2"));
        producer.send(session.createTextMessage("Msg:3"));
        
        consumer = session.createDurableSubscriber(topic, "sub1");

        // Try to get the message.
        assertTextMessageEquals("Msg:2", consumer.receive(1000));
        Thread.sleep(1000);
        assertNull(consumer.receive(1000));
    }

    public void testUnsubscribeSubscription() throws Exception {
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1");
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        // Make sure it works when the durable sub is active.
        producer.send(session.createTextMessage("Msg:1"));
        assertTextMessageEquals("Msg:1", consumer.receive(5000));

        // Deactivate the sub.
        consumer.close();
        // Send a new message.
        producer.send(session.createTextMessage("Msg:2"));
        session.unsubscribe("sub1");

        // Reopen the connection.
        connection.close();
        connection = createConnection();
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(topic);
        connection.start();

        // Activate the sub.
        consumer = session.createDurableSubscriber(topic, "sub1");
        producer.send(session.createTextMessage("Msg:3"));

        // Try to get the message.
        assertTextMessageEquals("Msg:3", consumer.receive(5000));
    }

    public void testInactiveDurableSubscriptionTwoConnections() throws Exception {
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1");
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        // Make sure it works when the durable sub is active.
        producer.send(session.createTextMessage("Msg:1"));
        assertTextMessageEquals("Msg:1", consumer.receive(5000));

        // Deactivate the sub.
        consumer.close();

        // Send a new message.
        producer.send(session.createTextMessage("Msg:2"));

        // Reopen the connection.
        connection.close();
        connection = createConnection();
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        connection.start();

        // Activate the sub.
        consumer = session.createDurableSubscriber(topic, "sub1");

        // Try to get the message.
        assertTextMessageEquals("Msg:2", consumer.receive(5000));
    }

    public void testInactiveDurableSubscriptionBrokerRestart() throws Exception {
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1");
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        // Make sure it works when the durable sub is active.
        producer.send(session.createTextMessage("Msg:1"));
        assertTextMessageEquals("Msg:1", consumer.receive(5000));

        // Deactivate the sub.
        consumer.close();

        // Send a new message.
        producer.send(session.createTextMessage("Msg:2"));

        // Reopen the connection.
        restartBroker();
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        connection.start();

        // Activate the sub.
        consumer = session.createDurableSubscriber(topic, "sub1");

        // Try to get the message.
        assertTextMessageEquals("Msg:2", consumer.receive(5000));
        assertNull(consumer.receive(5000));
    }
    
    public void testDurableSubscriptionBrokerRestart() throws Exception {

        // Create the durable sub.
        connection.start();
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);

        // Ensure that consumer will receive messages sent before it was created
        Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
        consumer = session.createDurableSubscriber(topic, "sub1");
        
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(session.createTextMessage("Msg:1"));
        assertTextMessageEquals("Msg:1", consumer.receive(5000));
        
        // Make sure cleanup kicks in
        Thread.sleep(1000);

        // Restart the broker.
        restartBroker();
    }

    public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception {

        // Create the durable sub.
        connection.start();
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);

        // Ensure that consumer will receive messages sent before it was created
        Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
        consumer = session.createDurableSubscriber(topic, "sub1");

        // Restart the broker.
        restartBroker();

        // Reconnection
        connection.start();
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // Make sure it works when the durable sub is active.
        producer.send(session.createTextMessage("Msg:1"));

        // Activate the sub.
        consumer = session.createDurableSubscriber(topic, "sub1");

        // Send a new message.
        producer.send(session.createTextMessage("Msg:2"));

        // Try to get the message.
        assertTextMessageEquals("Msg:1", consumer.receive(5000));
        assertTextMessageEquals("Msg:2", consumer.receive(5000));

        assertNull(consumer.receive(5000));
    }

    public void testDurableSubscriptionRollbackRedeliver() throws Exception {

        // Create the durable sub.
        connection.start();

        session = connection.createSession(true, javax.jms.Session.SESSION_TRANSACTED);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1");

        Session producerSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        producer = producerSession.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        producer.send(session.createTextMessage("Msg:1"));

        // receive and rollback
        assertTextMessageEquals("Msg:1", consumer.receive(5000));
        session.rollback();
        consumer.close();
        session.close();

        session = connection.createSession(true, javax.jms.Session.SESSION_TRANSACTED);

        // Ensure that consumer will receive messages sent and rolled back
        consumer = session.createDurableSubscriber(topic, "sub1");

        assertTextMessageEquals("Msg:1", consumer.receive(5000));
        session.commit();

        assertNull(consumer.receive(5000));
    }

    public void xtestInactiveDurableSubscriptionOneConnection() throws Exception {
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1");
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        // Make sure it works when the durable sub is active.
        producer.send(session.createTextMessage("Msg:1"));
        assertTextMessageEquals("Msg:1", consumer.receive(5000));

        // Deactivate the sub.
        consumer.close();

        // Send a new message.
        producer.send(session.createTextMessage("Msg:2"));

        // Activate the sub.
        consumer = session.createDurableSubscriber(topic, "sub1");

        // Try to get the message.
        assertTextMessageEquals("Msg:2", consumer.receive(5000));
    }

    public void testSelectorChange() throws Exception {
        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");
        consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
        producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        // Make sure it works when the durable sub is active.
        TextMessage msg = session.createTextMessage();
        msg.setText("Msg:1");
        msg.setStringProperty("color", "blue");
        producer.send(msg);
        msg.setText("Msg:2");
        msg.setStringProperty("color", "red");
        producer.send(msg);

        assertTextMessageEquals("Msg:2", consumer.receive(5000));

        // Change the subscription
        consumer.close();
        consumer = session.createDurableSubscriber(topic, "sub1", "color='blue'", false);

        // Send a new message.
        msg.setText("Msg:3");
        msg.setStringProperty("color", "red");
        producer.send(msg);
        msg.setText("Msg:4");
        msg.setStringProperty("color", "blue");
        producer.send(msg);

        // Try to get the message.
        assertTextMessageEquals("Msg:4", consumer.receive(5000));
    }

    public void testDurableSubWorksInNewSession() throws JMSException {

        // Create the consumer.
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Topic topic = session.createTopic("topic-" + getName());
        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
        // Drain any messages that may allready be in the sub
        while (consumer.receive(1000) != null) {
        }

        // See if the durable sub works in a new session.
        session.close();
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        // Send a Message that should be added to the durable sub.
        MessageProducer producer = createProducer(session, topic);
        producer.send(session.createTextMessage("Message 1"));

        // Activate the durable sub now. And receive the message.
        consumer = session.createDurableSubscriber(topic, "sub1");
        Message msg = consumer.receive(1000);
        assertNotNull(msg);
        assertEquals("Message 1", ((TextMessage)msg).getText());

    }

    public void testDurableSubWorksInNewConnection() throws Exception {

        // Create the consumer.
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Topic topic = session.createTopic("topic-" + getName());
        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
        // Drain any messages that may allready be in the sub
        while (consumer.receive(1000) != null) {
        }

        // See if the durable sub works in a new connection.
        // The embeded broker shutsdown when his connections are closed.
        // So we open the new connection before the old one is closed.
        connection.close();
        connection = createConnection();
        connection.start();
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        // Send a Message that should be added to the durable sub.
        MessageProducer producer = createProducer(session, topic);
        producer.send(session.createTextMessage("Message 1"));

        // Activate the durable sub now. And receive the message.
        consumer = session.createDurableSubscriber(topic, "sub1");
        Message msg = consumer.receive(1000);
        assertNotNull(msg);
        assertEquals("Message 1", ((TextMessage)msg).getText());

    }

    private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(getDeliveryMode());
        return producer;
    }

    protected int getDeliveryMode() {
        return DeliveryMode.PERSISTENT;
    }

    private void assertTextMessageEquals(String string, Message message) throws JMSException {
        assertNotNull("Message was null", message);
        assertTrue("Message is not a TextMessage", message instanceof TextMessage);
        assertEquals(string, ((TextMessage)message).getText());
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ DurableSubscriptionTestSupport.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.