|
ActiveMQ example source code file (InactivityMonitorTest.java)
The ActiveMQ InactivityMonitorTest.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.transport.tcp; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener { private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitorTest.class); public Runnable serverRunOnCommand; public Runnable clientRunOnCommand; private TransportServer server; private Transport clientTransport; private Transport serverTransport; private final AtomicInteger clientReceiveCount = new AtomicInteger(0); private final AtomicInteger clientErrorCount = new AtomicInteger(0); private final AtomicInteger serverReceiveCount = new AtomicInteger(0); private final AtomicInteger serverErrorCount = new AtomicInteger(0); private final AtomicBoolean ignoreClientError = new AtomicBoolean(false); private final AtomicBoolean ignoreServerError = new AtomicBoolean(false); protected void setUp() throws Exception { super.setUp(); startTransportServer(); } /** * @throws Exception * @throws URISyntaxException */ private void startClient() throws Exception, URISyntaxException { clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); clientTransport.setTransportListener(new TransportListener() { public void onCommand(Object command) { clientReceiveCount.incrementAndGet(); if (clientRunOnCommand != null) { clientRunOnCommand.run(); } } public void onException(IOException error) { if (!ignoreClientError.get()) { LOG.info("Client transport error:"); error.printStackTrace(); clientErrorCount.incrementAndGet(); } } public void transportInterupted() { } public void transportResumed() { } }); clientTransport.start(); } /** * @throws IOException * @throws URISyntaxException * @throws Exception */ private void startTransportServer() throws IOException, URISyntaxException, Exception { server = TransportFactory.bind(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); server.setAcceptListener(this); server.start(); } protected void tearDown() throws Exception { ignoreClientError.set(true); ignoreServerError.set(true); try { if (clientTransport != null) { clientTransport.stop(); } if (serverTransport != null) { serverTransport.stop(); } if (server != null) { server.stop(); } } catch (Throwable e) { e.printStackTrace(); } super.tearDown(); } public void onAccept(Transport transport) { try { LOG.info("[" + getName() + "] Server Accepted a Connection"); serverTransport = transport; serverTransport.setTransportListener(new TransportListener() { public void onCommand(Object command) { serverReceiveCount.incrementAndGet(); if (serverRunOnCommand != null) { serverRunOnCommand.run(); } } public void onException(IOException error) { if (!ignoreClientError.get()) { LOG.info("Server transport error:", error); serverErrorCount.incrementAndGet(); } } public void transportInterupted() { } public void transportResumed() { } }); serverTransport.start(); } catch (Exception e) { e.printStackTrace(); } } public void onAcceptError(Exception error) { LOG.trace(error.toString()); } public void testClientHang() throws Exception { // // Manually create a client transport so that it does not send KeepAlive // packets. // this should simulate a client hang. clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null); clientTransport.setTransportListener(new TransportListener() { public void onCommand(Object command) { clientReceiveCount.incrementAndGet(); if (clientRunOnCommand != null) { clientRunOnCommand.run(); } } public void onException(IOException error) { if (!ignoreClientError.get()) { LOG.info("Client transport error:"); error.printStackTrace(); clientErrorCount.incrementAndGet(); } } public void transportInterupted() { } public void transportResumed() { } }); clientTransport.start(); WireFormatInfo info = new WireFormatInfo(); info.setVersion(OpenWireFormat.DEFAULT_VERSION); info.setMaxInactivityDuration(1000); clientTransport.oneway(info); assertEquals(0, serverErrorCount.get()); assertEquals(0, clientErrorCount.get()); // Server should consider the client timed out right away since the // client is not hart beating fast enough. Thread.sleep(6000); assertEquals(0, clientErrorCount.get()); assertTrue(serverErrorCount.get() > 0); } public void testNoClientHang() throws Exception { startClient(); assertEquals(0, serverErrorCount.get()); assertEquals(0, clientErrorCount.get()); Thread.sleep(4000); assertEquals(0, clientErrorCount.get()); assertEquals(0, serverErrorCount.get()); } /** * Used to test when a operation blocks. This should not cause transport to * get disconnected. * * @throws Exception * @throws URISyntaxException */ public void initCombosForTestNoClientHangWithServerBlock() throws Exception { startClient(); addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)}); addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)}); addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() { public void run() { try { LOG.info("Sleeping"); Thread.sleep(4000); } catch (InterruptedException e) { } } }}); } public void testNoClientHangWithServerBlock() throws Exception { startClient(); assertEquals(0, serverErrorCount.get()); assertEquals(0, clientErrorCount.get()); Thread.sleep(4000); assertEquals(0, clientErrorCount.get()); assertEquals(0, serverErrorCount.get()); } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ InactivityMonitorTest.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.