|
ActiveMQ example source code file (AMQDeadlockTest3.java)
The ActiveMQ AMQDeadlockTest3.java source code
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
private static final transient Logger LOG = LoggerFactory.getLogger(AMQDeadlockTest3.class);
private static final String URL1 = "tcp://localhost:61616";
private static final String URL2 = "tcp://localhost:61617";
private static final String QUEUE1_NAME = "test.queue.1";
private static final String QUEUE2_NAME = "test.queue.2";
private static final int MAX_CONSUMERS = 1;
private static final int MAX_PRODUCERS = 1;
private static final int NUM_MESSAGE_TO_SEND = 10;
private AtomicInteger messageCount = new AtomicInteger();
private CountDownLatch doneLatch;
public void setUp() throws Exception {
}
public void tearDown() throws Exception {
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
BrokerService brokerService1 = null;
ActiveMQConnectionFactory acf = null;
PooledConnectionFactory pcf = null;
DefaultMessageListenerContainer container1 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, null);
brokerService1.start();
acf = createConnectionFactory(URL1);
pcf = new PooledConnectionFactory(acf);
// Only listen on the first queue.. let the 2nd queue fill up.
doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
Thread.sleep(2000);
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
Thread.sleep(1000);
executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
}
// Wait for all message to arrive.
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
brokerService1.stop();
brokerService1 = null;
}
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing() throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
ActiveMQConnectionFactory acf1 = null;
ActiveMQConnectionFactory acf2 = null;
PooledConnectionFactory pcf = null;
DefaultMessageListenerContainer container1 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, URL2);
brokerService1.start();
brokerService2 = createBrokerService("broker2", URL2, URL1);
brokerService2.start();
acf1 = createConnectionFactory(URL1);
acf2 = createConnectionFactory(URL2);
pcf = new PooledConnectionFactory(acf1);
Thread.sleep(1000);
doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
Thread.sleep(1000);
executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
}
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
brokerService1.stop();
brokerService1 = null;
brokerService2.stop();
brokerService2 = null;
}
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing() throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
ActiveMQConnectionFactory acf1 = null;
ActiveMQConnectionFactory acf2 = null;
DefaultMessageListenerContainer container1 = null;
DefaultMessageListenerContainer container2 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, URL2);
brokerService1.start();
brokerService2 = createBrokerService("broker2", URL2, URL1);
brokerService2.start();
acf1 = createConnectionFactory(URL1);
acf2 = createConnectionFactory(URL2);
Thread.sleep(1000);
doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND * MAX_PRODUCERS);
container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
container2.afterPropertiesSet();
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
Thread.sleep(1000);
executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
}
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
container2.stop();
container2.destroy();
container2 = null;
brokerService1.stop();
brokerService1 = null;
brokerService2.stop();
brokerService2 = null;
}
}
private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2) throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setBrokerName(brokerName);
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(5000000);
brokerService.setSystemUsage(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList
Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ AMQDeadlockTest3.java source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.