alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (TransactionNotStartedErrorTest.java)

This example ActiveMQ source code file (TransactionNotStartedErrorTest.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

brokerservice, connection, exception, exception, jmsexception, messagelistener, messagesender, messagesender, receiver, receiver, session, string, string, thread

The ActiveMQ TransactionNotStartedErrorTest.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * simulate message flow which cause the following exception in the broker
 * (exception logged by client) <p/> 2007-07-24 13:51:23,624
 * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
 * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
 * has not been started. at
 * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
 * This appears to be consistent in a MacBook. Haven't been able to replicate it
 * on Windows though
 */
public class TransactionNotStartedErrorTest extends TestCase {

    private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
    
    private static int counter = 500;

    private static int hectorToHaloCtr;
    private static int xenaToHaloCtr;
    private static int troyToHaloCtr;

    private static int haloToHectorCtr;
    private static int haloToXenaCtr;
    private static int haloToTroyCtr;

    private String hectorToHalo = "hectorToHalo";
    private String xenaToHalo = "xenaToHalo";
    private String troyToHalo = "troyToHalo";

    private String haloToHector = "haloToHector";
    private String haloToXena = "haloToXena";
    private String haloToTroy = "haloToTroy";


    private BrokerService broker;

    private Connection hectorConnection;
    private Connection xenaConnection;
    private Connection troyConnection;
    private Connection haloConnection;

    private final Object lock = new Object();

    public Connection createConnection() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        return factory.createConnection();
    }

    public Session createSession(Connection connection, boolean transacted) throws JMSException {
        return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
    }

    public void startBroker() throws Exception {
        broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setPersistent(true);
        broker.setUseJmx(true);
        broker.addConnector("tcp://localhost:61616").setName("Default");
        broker.start();
        LOG.info("Starting broker..");
    }

    public void tearDown() throws Exception {
        hectorConnection.close();
        xenaConnection.close();
        troyConnection.close();
        haloConnection.close();
        broker.stop();
    }

    public void testTransactionNotStartedError() throws Exception {
        startBroker();
        hectorConnection = createConnection();
        Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
        Receiver hHectorReceiver = new Receiver() {
            public void receive(String s) throws Exception {
                haloToHectorCtr++;
                if (haloToHectorCtr >= counter) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);

        troyConnection = createConnection();
        Thread troyThread = buildProducer(troyConnection, troyToHalo);
        Receiver hTroyReceiver = new Receiver() {
            public void receive(String s) throws Exception {
                haloToTroyCtr++;
                if (haloToTroyCtr >= counter) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);

        xenaConnection = createConnection();
        Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
        Receiver hXenaReceiver = new Receiver() {
            public void receive(String s) throws Exception {
                haloToXenaCtr++;
                if (haloToXenaCtr >= counter) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);

        haloConnection = createConnection();
        final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
        final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
        final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
        Receiver hectorReceiver = new Receiver() {
            public void receive(String s) throws Exception {
                hectorToHaloCtr++;
                troySender.send("halo to troy because of hector");
                if (hectorToHaloCtr >= counter) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        Receiver xenaReceiver = new Receiver() {
            public void receive(String s) throws Exception {
                xenaToHaloCtr++;
                hectorSender.send("halo to hector because of xena");
                if (xenaToHaloCtr >= counter) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        Receiver troyReceiver = new Receiver() {
            public void receive(String s) throws Exception {
                troyToHaloCtr++;
                xenaSender.send("halo to xena because of troy");
                if (troyToHaloCtr >= counter) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
        buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
        buildReceiver(haloConnection, troyToHalo, true, troyReceiver);

        haloConnection.start();

        troyConnection.start();
        troyThread.start();

        xenaConnection.start();
        xenaThread.start();

        hectorConnection.start();
        hectorThread.start();
        waitForMessagesToBeDelivered();
        // number of messages received should match messages sent
        assertEquals(hectorToHaloCtr, counter);
        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
        assertEquals(xenaToHaloCtr, counter);
        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
        assertEquals(troyToHaloCtr, counter);
        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
        assertEquals(haloToHectorCtr, counter);
        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
        assertEquals(haloToXenaCtr, counter);
        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
        assertEquals(haloToTroyCtr, counter);
        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");

    }

    protected void waitForMessagesToBeDelivered() {
        // let's give the listeners enough time to read all messages
        long maxWaitTime = counter * 3000;
        long waitTime = maxWaitTime;
        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();

        synchronized (lock) {
            boolean hasMessages = true;
            while (hasMessages && waitTime >= 0) {
                try {
                    lock.wait(200);
                } catch (InterruptedException e) {
                    LOG.error(e.toString());
                }
                // check if all messages have been received
                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
                              || haloToTroyCtr < counter;
                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
            }
        }
    }

    public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {

        return new MessageSender(queueName, connection, true, false);
    }

    public Thread buildProducer(Connection connection, final String queueName) throws Exception {

        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        final MessageSender producer = new MessageSender(queueName, connection, false, false);
        Thread thread = new Thread() {

            public synchronized void run() {
                for (int i = 0; i < counter; i++) {
                    try {
                        producer.send(queueName);
                        if (session.getTransacted()) {
                            session.commit();
                        }

                    } catch (Exception e) {
                        throw new RuntimeException("on " + queueName + " send", e);
                    }
                }
            }
        };
        return thread;
    }

    public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception {
        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
        MessageListener messageListener = new MessageListener() {

            public void onMessage(Message message) {
                try {
                    ObjectMessage objectMessage = (ObjectMessage)message;
                    String s = (String)objectMessage.getObject();
                    receiver.receive(s);
                    if (session.getTransacted()) {
                        session.commit();
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        inputMessageConsumer.setMessageListener(messageListener);
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ TransactionNotStartedErrorTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.