|
ActiveMQ example source code file (StompAdvisoryTest.java)
The ActiveMQ StompAdvisoryTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.transport.stomp; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTestSupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.io.File; import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * */ public class StompAdvisoryTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class); protected ConnectionFactory factory; protected ActiveMQConnection connection; protected BrokerService broker; StompConnection stompConnection; URI tcpBrokerUri; URI stompBrokerUri; private PolicyEntry createPolicyEntry() { PolicyEntry policy = new PolicyEntry(); policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryWhenFull(true); policy.setProducerFlowControl(false); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); policy.setPendingMessageLimitStrategy(strategy); return policy; } protected BrokerService createBroker() throws Exception { BrokerService broker = BrokerFactory.createBroker(new URI("broker://()/localhost?useJmx=false")); broker.setPersistent(false); PolicyEntry policy = new PolicyEntry(); policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryWhenFull(true); policy.setProducerFlowControl(false); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); policy.setPendingMessageLimitStrategy(strategy); PolicyMap pMap = new PolicyMap(); pMap.setDefaultEntry(policy); broker.setDestinationPolicy(pMap); broker.setDeleteAllMessagesOnStartup(true); broker.addConnector("tcp://localhost:0"); broker.addConnector("stomp://localhost:0"); return broker; } protected void setUp() throws Exception { super.setUp(); if (System.getProperty("basedir") == null) { File file = new File("."); System.setProperty("basedir", file.getAbsolutePath()); } broker = createBroker(); broker.start(); tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri()); LOG.info("Producing using TCP uri: " + tcpBrokerUri); LOG.info("consuming using STOMP uri: " + stompBrokerUri); stompConnection = new StompConnection(); stompConnection.open(new Socket("localhost", stompBrokerUri.getPort())); } protected void tearDown() throws Exception { stompConnection.disconnect(); stompConnection.close(); broker.stop(); } public void testConnectionAdvisory() throws Exception { Destination dest = new ActiveMQQueue("testConnectionAdvisory"); stompConnection.connect("system", "manager"); stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO); // Now connect via openwire and check we get the advisory ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri); Connection c = factory.createConnection(); c.start(); StompFrame f = stompConnection.receive(); LOG.debug(f.toString()); assertEquals(f.getAction(),"MESSAGE"); assertTrue("Should have a body", f.getBody().length() > 0); assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":")); Map<String,String> headers = f.getHeaders(); c.stop(); c.close(); f = stompConnection.receive(); LOG.debug(f.toString()); assertEquals(f.getAction(),"MESSAGE"); assertNotNull("Body is not null", f.getBody()); assertTrue("Body should have content", f.getBody().length() > 0); assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":")); } public void testConnectionAdvisoryJSON() throws Exception { Destination dest = new ActiveMQQueue("testConnectionAdvisory"); HashMap<String, String> subheaders = new HashMap Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ StompAdvisoryTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.