|
ActiveMQ example source code file (AMQ1936Test.java)
The ActiveMQ AMQ1936Test.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.bugs; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.Wait; import org.apache.log4j.Logger; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.NamingException; import junit.framework.TestCase; /** * A AMQ1936Test * */ public class AMQ1936Test extends TestCase{ private final static Logger logger = Logger.getLogger( AMQ1936Test.class ); private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue"; ////-- // private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use // ////-- private final static int CONSUMER_COUNT = 2; // The number of message receiver instances private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be processed within a JMS transaction private ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CONSUMER_COUNT,CONSUMER_COUNT, Long.MAX_VALUE,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() ); private ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[ CONSUMER_COUNT ]; private BrokerService broker = null; static QueueConnectionFactory connectionFactory = null; @Override protected void setUp() throws Exception { super.setUp(); broker = new BrokerService(); broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024); broker.setBrokerName("test"); broker.setDeleteAllMessagesOnStartup(true); broker.start(); connectionFactory = new ActiveMQConnectionFactory("vm://test");; } @Override protected void tearDown() throws Exception { super.tearDown(); if( threadPool!=null ) { // signal receivers to stop for( ThreadedMessageReceiver receiver: receivers) { receiver.setShouldStop( true ); } logger.info("Waiting for receivers to shutdown.."); if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) { logger.warn("Not all receivers completed shutdown."); } else { logger.info("All receivers shutdown successfully.."); } } logger.debug("Stoping the broker."); if( broker!=null ) { broker.stop(); } } private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException { QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test"); QueueConnection queueConnection = null; QueueSession session = null; QueueSender sender = null; Queue queue = null; TextMessage message = null; try { // Create the queue connection queueConnection = connectionFactory.createQueueConnection(); session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE ); queue = session.createQueue(TEST_QUEUE_NAME); sender = session.createSender( queue ); sender.setDeliveryMode( DeliveryMode.PERSISTENT ); message = session.createTextMessage( String.valueOf(i) ); // send the message sender.send( message ); if( session.getTransacted()) { session.commit(); } if (i%1000 == 0) { logger.info( "Message successfully sent to : " + queue.getQueueName( ) + " messageid: " + message.getJMSMessageID( ) + " content:" + message.getText()); } } finally { if( sender!=null ) { sender.close(); } if( session!=null ) { session.close(); } if( queueConnection!=null ) { queueConnection.close(); } } } public void testForDuplicateMessages( ) throws Exception { final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ AMQ1936Test.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.