alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (InactivityMonitor.java)

This example ActiveMQ source code file (InactivityMonitor.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

atomicboolean, atomicboolean, checker_counter, default_check_time_mills, inactivitymonitor, inactivitymonitor, io, ioexception, ioexception, keepaliveinfo, runnable, runnable, schedulertimertask, threading, threads, timer, util, wireformatinfo

The ActiveMQ InactivityMonitor.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.transport;

import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Used to make sure that commands are arriving periodically from the peer of
 * the transport.
 *
 * 
 */
public class InactivityMonitor extends TransportFilter {

    private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
    private static ThreadPoolExecutor ASYNC_TASKS;
    private static int CHECKER_COUNTER;
    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
    private static Timer  READ_CHECK_TIMER;
    private static Timer  WRITE_CHECK_TIMER;

    private WireFormatInfo localWireFormatInfo;
    private WireFormatInfo remoteWireFormatInfo;
    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);

    private final AtomicBoolean commandSent = new AtomicBoolean(false);
    private final AtomicBoolean inSend = new AtomicBoolean(false);
    private final AtomicBoolean failed = new AtomicBoolean(false);

    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
    private final AtomicBoolean inReceive = new AtomicBoolean(false);
    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);

    private SchedulerTimerTask writeCheckerTask;
    private SchedulerTimerTask readCheckerTask;

    private boolean ignoreRemoteWireFormat = false;
    private boolean ignoreAllWireFormatInfo = false;

    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
    private boolean useKeepAlive = true;
    private boolean keepAliveResponseRequired;
    private WireFormat wireFormat;

    private final Runnable readChecker = new Runnable() {
        long lastRunTime;
        public void run() {
            long now = System.currentTimeMillis();
            long elapsed = (now-lastRunTime);

            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
                LOG.debug(""+elapsed+" ms elapsed since last read check.");
            }

            // Perhaps the timer executed a read check late.. and then executes
            // the next read check on time which causes the time elapsed between
            // read checks to be small..

            // If less than 90% of the read check Time elapsed then abort this readcheck.
            if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
                return;
            }

            lastRunTime = now;
            readCheck();
        }
    };

    private boolean allowReadCheck(long elapsed) {
        return elapsed > (readCheckTime * 9 / 10);
    }

    private final Runnable writeChecker = new Runnable() {
        long lastRunTime;
        public void run() {
            long now = System.currentTimeMillis();
            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
                LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");

            }
            lastRunTime = now;
            writeCheck();
        }
    };

    public InactivityMonitor(Transport next, WireFormat wireFormat) {
        super(next);
        this.wireFormat = wireFormat;
        if (this.wireFormat == null) {
            this.ignoreAllWireFormatInfo = true;
        }
    }

    public void start() throws Exception {
        next.start();
        startMonitorThreads();
    }

    public void stop() throws Exception {
        stopMonitorThreads();
        next.stop();
    }

    final void writeCheck() {
        if (inSend.get()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("A send is in progress");
            }
            return;
        }

        if (!commandSent.get() && useKeepAlive) {
            if (LOG.isTraceEnabled()) {
                LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
            }
            ASYNC_TASKS.execute(new Runnable() {
                public void run() {
                    if (monitorStarted.get()) {
                        try {

                            KeepAliveInfo info = new KeepAliveInfo();
                            info.setResponseRequired(keepAliveResponseRequired);
                            oneway(info);
                        } catch (IOException e) {
                            onException(e);
                        }
                    }
                };
            });
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace(this + " message sent since last write check, resetting flag");
            }
        }

        commandSent.set(false);
    }

    final void readCheck() {
        int currentCounter = next.getReceiveCounter();
        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
        if (inReceive.get() || currentCounter!=previousCounter ) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("A receive is in progress");
            }
            return;
        }
        if (!commandReceived.get()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
            }
            ASYNC_TASKS.execute(new Runnable() {
                public void run() {
                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
                };

            });
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Message received since last read check, resetting flag: ");
            }
        }
        commandReceived.set(false);
    }

    public void onCommand(Object command) {
        commandReceived.set(true);
        inReceive.set(true);
        try {
            if (command.getClass() == KeepAliveInfo.class) {
                KeepAliveInfo info = (KeepAliveInfo) command;
                if (info.isResponseRequired()) {
                    try {
                        info.setResponseRequired(false);
                        oneway(info);
                    } catch (IOException e) {
                        onException(e);
                    }
                }
            } else {
                if (command.getClass() == WireFormatInfo.class) {
                    synchronized (this) {
                        IOException error = null;
                        remoteWireFormatInfo = (WireFormatInfo) command;
                        try {
                            startMonitorThreads();
                        } catch (IOException e) {
                            error = e;
                        }
                        if (error != null) {
                            onException(error);
                        }
                    }
                }
                synchronized (readChecker) {
                    transportListener.onCommand(command);
                }
            }
        } finally {

            inReceive.set(false);
        }
    }

    public void oneway(Object o) throws IOException {
        // Disable inactivity monitoring while processing a command.
        //synchronize this method - its not synchronized
        //further down the transport stack and gets called by more
        //than one thread  by this class
        synchronized(inSend) {
            inSend.set(true);
            try {

                if( failed.get() ) {
                    throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
                }
                if (o.getClass() == WireFormatInfo.class) {
                    synchronized (this) {
                        localWireFormatInfo = (WireFormatInfo)o;
                        startMonitorThreads();
                    }
                }
                next.oneway(o);
            } finally {
                commandSent.set(true);
                inSend.set(false);
            }
        }
    }

    public void onException(IOException error) {
        if (failed.compareAndSet(false, true)) {
            stopMonitorThreads();
            transportListener.onException(error);
        }
    }

    public void setKeepAliveResponseRequired(boolean val) {
        keepAliveResponseRequired = val;
    }
    
    public void setUseKeepAlive(boolean val) {
        useKeepAlive = val;
    }

    public void setIgnoreRemoteWireFormat(boolean val) {
        ignoreRemoteWireFormat = val;
    }

    public long getReadCheckTime() {
        return readCheckTime;
    }

    public void setReadCheckTime(long readCheckTime) {
        this.readCheckTime = readCheckTime;
    }

    public long getInitialDelayTime() {
        return initialDelayTime;
    }

    public void setInitialDelayTime(long initialDelayTime) {
        this.initialDelayTime = initialDelayTime;
    }
    
    private synchronized void startMonitorThreads() throws IOException {
        if (monitorStarted.get()) {
            return;
        }

        if (!configuredOk()) {
            return;
        }

        if (readCheckTime > 0) {
            monitorStarted.set(true);
            writeCheckerTask = new SchedulerTimerTask(writeChecker);
            readCheckerTask = new  SchedulerTimerTask(readChecker);
            writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
            synchronized( InactivityMonitor.class ) {
                if( CHECKER_COUNTER == 0 ) {
                    ASYNC_TASKS = createExecutor();
                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
                    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
                }
                CHECKER_COUNTER++;
                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
            }
        }
    }

    private boolean configuredOk() throws IOException {
        boolean configured = false;
        if (ignoreAllWireFormatInfo) {
            configured = true;
        } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
            if (!ignoreRemoteWireFormat) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
                }
                readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
                initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using local: " + localWireFormatInfo);
                }
                readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
                initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
            }
            configured = true;
        }
        return configured;
    }

    /**
     *
     */
    private synchronized void stopMonitorThreads() {
        if (monitorStarted.compareAndSet(true, false)) {
            readCheckerTask.cancel();
            writeCheckerTask.cancel();
            synchronized( InactivityMonitor.class ) {
                WRITE_CHECK_TIMER.purge();
                READ_CHECK_TIMER.purge();
                CHECKER_COUNTER--;
                if(CHECKER_COUNTER==0) {
                    WRITE_CHECK_TIMER.cancel();
                    READ_CHECK_TIMER.cancel();
                    WRITE_CHECK_TIMER = null;
                    READ_CHECK_TIMER = null;
                    ASYNC_TASKS.shutdownNow();
                    ASYNC_TASKS = null;
                }
            }
        }
    }

    private ThreadFactory factory = new ThreadFactory() {
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
            thread.setDaemon(true);
            return thread;
        }
    };

    private ThreadPoolExecutor createExecutor() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ InactivityMonitor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.