|
ActiveMQ example source code file (ConsumeTopicPrefetchTest.java)
The ActiveMQ ConsumeTopicPrefetchTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.usecases; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.policy.IndividualDeadLetterViaXmlTest; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(ConsumeTopicPrefetchTest.class); protected int prefetchSize = 100; protected String[] messageTexts; protected long consumerTimeout = 10000L; public void testSendPrefetchSize() throws JMSException { testWithMessageCount(prefetchSize); } public void testSendDoublePrefetchSize() throws JMSException { testWithMessageCount(prefetchSize * 2); } public void testSendPrefetchSizePlusOne() throws JMSException { testWithMessageCount(prefetchSize + 1); } protected void testWithMessageCount(int messageCount) throws JMSException { makeMessages(messageCount); LOG.info("About to send and receive: " + messageCount + " on destination: " + destination + " of type: " + destination.getClass().getName()); for (int i = 0; i < messageCount; i++) { Message message = session.createTextMessage(messageTexts[i]); producer.send(message); } validateConsumerPrefetch(this.getSubject(), prefetchSize); // lets consume them in two fetch batches for (int i = 0; i < messageCount; i++) { consumeMessge(i); } } protected Connection createConnection() throws Exception { ActiveMQConnection connection = (ActiveMQConnection) super.createConnection(); connection.getPrefetchPolicy().setQueuePrefetch(prefetchSize); connection.getPrefetchPolicy().setTopicPrefetch(prefetchSize); return connection; } protected TextMessage consumeMessge(int i) throws JMSException { Message message = consumer.receive(consumerTimeout); assertTrue("Should have received a message by now for message: " + i, message != null); assertTrue("Should be a TextMessage: " + message, message instanceof TextMessage); TextMessage textMessage = (TextMessage) message; assertEquals("Message content", messageTexts[i], textMessage.getText()); return textMessage; } protected void makeMessages(int messageCount) { messageTexts = new String[messageCount]; for (int i = 0; i < messageCount; i++) { messageTexts[i] = "Message for test: + " + getName() + " = " + i; } } protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException { RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().lookup("localhost").getRegionBroker(); for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { final org.apache.activemq.broker.region.Destination target = dest; if (dest.getName().equals(destination)) { try { Wait.waitFor(new Condition() { public boolean isSatisified() throws Exception { DestinationStatistics stats = target.getDestinationStatistics(); LOG.info("inflight for : " + target.getName() + ": " + stats.getInflight().getCount()); return stats.getInflight().getCount() == expectedCount; } }); } catch (Exception e) { throw new JMSException(e.toString()); } DestinationStatistics stats = dest.getDestinationStatistics(); LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount()); assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", expectedCount, stats.getInflight().getCount()); } } } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ ConsumeTopicPrefetchTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.