alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (AMQ1936Test.java)

This example ActiveMQ source code file (AMQ1936Test.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

consumer_count, duplicate, exception, exception, imessagehandler, jmsexception, message, naming, string, string, test_message_count, test_message_count, test_queue_name, threadedmessagereceiver, threadedmessagereceiver, threading, threads, util

The ActiveMQ AMQ1936Test.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.bugs;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;

import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import junit.framework.TestCase;
/**
 * A AMQ1936Test
 *
 */
public class AMQ1936Test extends TestCase{
    private final static Logger logger = Logger.getLogger( AMQ1936Test.class );
    private final static String TEST_QUEUE_NAME     = "dynamicQueues/duplicate.message.test.queue";
    ////--
    //
    private final static long TEST_MESSAGE_COUNT    = 6000;    // The number of test messages to use
    //
    ////--
    private final static int CONSUMER_COUNT         = 2;        // The number of message receiver instances
    private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be processed within a JMS transaction

    private ThreadPoolExecutor threadPool           = new ThreadPoolExecutor( CONSUMER_COUNT,CONSUMER_COUNT, Long.MAX_VALUE,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
    private ThreadedMessageReceiver[] receivers     = new ThreadedMessageReceiver[ CONSUMER_COUNT ];
    private BrokerService broker                    = null;
    static QueueConnectionFactory connectionFactory = null;
   
   

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        
        broker = new BrokerService();
        broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
        broker.setBrokerName("test");
        broker.setDeleteAllMessagesOnStartup(true);
        broker.start();
        connectionFactory        = new ActiveMQConnectionFactory("vm://test");;
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        
        if( threadPool!=null ) {
            // signal receivers to stop
            for( ThreadedMessageReceiver receiver: receivers) {
                receiver.setShouldStop( true );
            }
            
            logger.info("Waiting for receivers to shutdown..");
            if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
                logger.warn("Not all receivers completed shutdown.");
            } else {
                logger.info("All receivers shutdown successfully..");
            }
        }
        
        logger.debug("Stoping the broker.");
        
        if( broker!=null ) {
            broker.stop();
        }
    }
    
    private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException {
        QueueConnectionFactory connectionFactory        = new ActiveMQConnectionFactory("vm://test");
        QueueConnection queueConnection                 = null;
        QueueSession session                            = null;
        QueueSender sender                              = null;
        Queue queue                                     = null;
        TextMessage message                                 = null;
        
        try {
            
            // Create the queue connection
            queueConnection = connectionFactory.createQueueConnection();
    
            session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
            queue = session.createQueue(TEST_QUEUE_NAME);
            sender = session.createSender( queue );
            sender.setDeliveryMode( DeliveryMode.PERSISTENT );

            message = session.createTextMessage( String.valueOf(i) );
            
            // send the message
            sender.send( message );
    
            if( session.getTransacted()) {
                session.commit();
            }
            if (i%1000 == 0) {
                logger.info( "Message successfully sent to : " + queue.getQueueName( ) + " messageid: " + message.getJMSMessageID( )
                        + " content:" + message.getText());
            }
        } finally {
            if( sender!=null ) {
                sender.close();
            }
            if( session!=null ) {
                session.close();
            }
            if( queueConnection!=null ) {
                queueConnection.close();
            }
        }
    }
     
    
    public void testForDuplicateMessages( ) throws Exception {
        final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap( );
        final Object lock                               = new Object( );
        final CountDownLatch duplicateSignal            = new CountDownLatch( 1 );
        final AtomicInteger messageCount                = new AtomicInteger( 0 );
        
        // add 1/2 the number of our total messages
        for( int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
            if( duplicateSignal.getCount()==0 ) {
                fail( "Duplicate message id detected" );
            }
            sendTextMessage( TEST_QUEUE_NAME, i );
        }
        
        // create a number of consumers to read of the messages and start them with a handler which simply stores the message ids
        // in a Map and checks for a duplicate
        for( int i = 0; i < CONSUMER_COUNT; i++ ) {
            receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler( ) {
            
                public void onMessage( Message message ) throws Exception {
                    synchronized( lock ) {
                        int current = messageCount.incrementAndGet();
                        if (current % 1000 == 0) {
                            logger.info( "Received message:" + message.getJMSMessageID() +  " with content: " + ((TextMessage)message).getText() );
                        }
                        if(  messages.containsKey( message.getJMSMessageID()) ) {
                            duplicateSignal.countDown( );
                            logger.fatal( "duplicate message id detected:" + message.getJMSMessageID() );
                            fail( "Duplicate message id detected:" + message.getJMSMessageID() );
                        } else {
                            messages.put( message.getJMSMessageID(), message.getJMSMessageID() );
                        }
                    }
                }
            });
            threadPool.submit( receivers[i]);
        }
        
        // starting adding the remaining messages
        for(int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
            if( duplicateSignal.getCount()==0) {
                fail( "Duplicate message id detected" );
            }
            sendTextMessage( TEST_QUEUE_NAME, i );
        }
        
        logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
        
        // allow some time for messages to be delivered to receivers.
        boolean ok = Wait.waitFor(new Wait.Condition() {
            public boolean isSatisified() throws Exception {
                return TEST_MESSAGE_COUNT == messages.size();
            }
        }, 1*60*1000);
        if (!ok) {
            AutoFailTestSupport.dumpAllThreads("--STUCK?--");
        }
        assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) );
        assertEquals( TEST_MESSAGE_COUNT,  messageCount.get() );
    }
    
    
    
    private final static class ThreadedMessageReceiver implements Runnable {
       
        private String queueName            = null;
        private IMessageHandler handler     = null;
        private AtomicBoolean shouldStop    = new AtomicBoolean( false );
        
        public ThreadedMessageReceiver(String queueName, IMessageHandler handler ) {
         
            this.queueName      = queueName;
            this.handler        = handler;
        }

        public void run( ) {
           
            QueueConnection queueConnection                 = null;
            QueueSession session                            = null;
            QueueReceiver receiver                          = null;
            Queue queue                                     = null;
            Message message                                 = null;
            try {
                try {
                 
                    queueConnection = connectionFactory.createQueueConnection( );
                    // create a transacted session
                    session = queueConnection.createQueueSession( TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE );
                    queue = session.createQueue(TEST_QUEUE_NAME);
                    receiver = session.createReceiver( queue );

                    // start the connection
                    queueConnection.start( );
                    
                    logger.info( "Receiver " + Thread.currentThread().getName() + " connected." );
                    
                    // start receive loop
                    while( ! ( shouldStop.get() || Thread.currentThread().isInterrupted()) ) {
                        try {
                            message = receiver.receive( 200 );
                        } catch( Exception e) {
                            //
                            // ignore interrupted exceptions
                            //
                            if( e instanceof InterruptedException || e.getCause() instanceof InterruptedException ) {
                                /* ignore */
                            } else {
                                throw e;
                            }
                        }
                        
                        if( message!=null && this.handler!=null ) {
                            this.handler.onMessage(message);
                        }
                        
                        // commit session on successful handling of message
                        if( session.getTransacted()) {
                            session.commit();
                        }
                    }
                    
                    logger.info( "Receiver " + Thread.currentThread().getName() + " shutting down." );
                    
                } finally {
                    if( receiver!=null ) {
                        try {
                            receiver.close();
                        } catch (JMSException e)  { 
                            logger.warn(e); 
                        }
                    }
                    if( session!=null ) {
                        try {
                            session.close();
                        } catch (JMSException e)  { 
                            logger.warn(e); 
                        }
                    }
                    if( queueConnection!=null ) {
                        queueConnection.close();
                    }
                }
            } catch ( JMSException e ) {
                logger.error(e);
                e.printStackTrace();
            } catch (NamingException e) {
                logger.error(e);
            } catch (Exception e) {
                logger.error(e);
                e.printStackTrace();
            }
        }

        public Boolean getShouldStop() {
            return shouldStop.get();
        }

        public void setShouldStop(Boolean shouldStop) {
            this.shouldStop.set(shouldStop);
        }
    }
    
    public interface IMessageHandler {
        void onMessage( Message message ) throws Exception;
    }
    

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ AMQ1936Test.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.