// $Header: /home/cvs/jakarta-jmeter/src/core/org/apache/jmeter/threads/,v 1.16 2004/02/14 03:34:29 sebb Exp $
 * Copyright 2001-2004 The Apache Software Foundation.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.jmeter.threads;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.BufferUtils;
import org.apache.commons.collections.UnboundedFifoBuffer;
import org.apache.jmeter.samplers.SampleEvent;
import org.apache.jmeter.samplers.SampleListener;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

 * The ListenerNotifier thread is responsible for performing
 * asynchronous notifications that a sample has occurred.  Each time a
 * sample occurs, the addLast method should be called to add
 * the sample and its list of listeners to the notification queue.  This
 * thread will then notify those listeners asynchronously at some future
 * time.

* In the current implementation, the notifications will be made in batches, * with 2 seconds between the beginning of successive batches. If the * notifier thread starts to get behind, the priority of the thread will be * increased in an attempt to help it to keep up. * * @see org.apache.jmeter.samplers.SampleListener * * @version $Revision: 1.16 $ */ public class ListenerNotifier { private static Logger log = LoggingManager.getLoggerForClass(); /** * The number of milliseconds between batches of notifications. */ private static final int SLEEP_TIME = 2000; /** * Indicates whether or not this thread should remain running. The * thread will continue running after this field is set to false until * the next batch of notifications has been completed and the * notification queue is empty. */ private boolean running = true; /** * Indicates whether or not this thread has stopped. No further * notifications will be performed. */ private boolean isStopped = true; /** * The queue containing the notifications to be performed. Each * notification consists of a pair of entries in this queue. The * first is the {@link org.apache.jmeter.samplers.SampleEvent * SampleEvent} representing the sample. The second is a List of * {@link org.apache.jmeter.samplers.SampleListener SampleListener}s * which should be notified. */ private Buffer listenerEvents = BufferUtils.synchronizedBuffer(new UnboundedFifoBuffer()); /** * Stops the ListenerNotifier thread. The thread will continue processing * any events remaining in the notification queue before it actually * stops, but this method will return immediately. */ public void stop() { running = false; } /** * Indicates whether or not the thread has stopped. This will not * return true until the stop method has been called and * any remaining notifications in the queue have been completed. * * @return true if the ListenerNotifier has completely stopped, false * otherwise */ public boolean isStopped() { return isStopped; } /** * Process the events in the notification queue until the thread has * been told to stop and the notification queue is empty. *

* In the current implementation, this method will iterate continually * until the thread is told to stop. In each iteration it will process * any notifications that are in the queue at the beginning of the * iteration, and then will sleep until it is time to start the next * batch. As long as the thread is keeping up, each batch should start * 2 seconds after the beginning of the last batch. This exact * behavior is subject to change. */ public void run() { boolean isMaximumPriority = false; int normalCount = 0; while (running) { long startTime = System.currentTimeMillis(); processNotifications(); long sleep = SLEEP_TIME - (System.currentTimeMillis() - startTime); // If the thread has been told to stop then we shouldn't sleep if (!running) { break; } if (sleep < 0) { isMaximumPriority = true; normalCount = 0; if (log.isInfoEnabled()) {"ListenerNotifier exceeded maximum " + "notification time by " + (-sleep) + "ms"); } boostPriority(); } else { normalCount++; // If there have been three consecutive iterations since the // last iteration which took too long to execute, return the // thread to normal priority. if (isMaximumPriority && normalCount >= 3) { isMaximumPriority = false; unboostPriority(); } if (log.isDebugEnabled()) { log.debug("ListenerNotifier sleeping for " + sleep + "ms"); } try { Thread.sleep(sleep); } catch (InterruptedException e) { } } } // Make sure that all pending notifications are processed before // actually ending the thread. processNotifications(); isStopped = true; } /** * Process all of the pending notifications. Only the samples which are * in the queue when this method is called will be processed. Any samples * added between the time when this method is called and when it exits are * saved for the next batch. */ private void processNotifications() { int listenerEventsSize = listenerEvents.size(); if (log.isDebugEnabled()) { log.debug ("ListenerNotifier: processing " + listenerEventsSize + " events"); } while (listenerEventsSize > 0) { // Since this is a FIFO and this is the only place we remove // from it (only from a single thread) we don't have to remove // these two items in one atomic operation. Each individual // remove is atomic (because we use a synchronized buffer), // which is necessary since the buffer can be accessed from // other threads (to add things to the buffer). SampleEvent res = (SampleEvent)listenerEvents.remove(); List listeners = (List)listenerEvents.remove(); notifyListeners (res, listeners); listenerEventsSize -= 2; } } /** * Boost the priority of the current thread to maximum priority. If * the thread is already at maximum priority then this will have no * effect. */ private void boostPriority() { if (Thread.currentThread().getPriority() != Thread.MAX_PRIORITY) {"ListenerNotifier: Boosting thread priority to maximum."); Thread.currentThread().setPriority(Thread.MAX_PRIORITY); } } /** * Return the priority of the current thread to normal. If the thread * is already at normal priority then this will have no effect. */ private void unboostPriority() { if (Thread.currentThread().getPriority() != Thread.NORM_PRIORITY) {"ListenerNotifier: Returning thread priority to normal."); Thread.currentThread().setPriority(Thread.NORM_PRIORITY); } } /** * Notify a list of listeners that a sample has occurred. * * @param res the sample event that has occurred. Must be non-null. * @param listeners a list of the listeners which should be notified. * This list must not be null and must contain only * SampleListener elements. */ public void notifyListeners(SampleEvent res, List listeners) { Iterator iter = listeners.iterator(); while (iter.hasNext()) { ((SampleListener); } } /** * Add a new sample event to the notification queue. The notification * will be performed asynchronously and this method will return * immediately. * * @param item the sample event that has occurred. Must be non-null. * @param listeners a list of the listeners which should be notified. * This list must not be null and must contain only * SampleListener elements. */ public void addLast(SampleEvent item, List listeners) { // Must use explicit synchronization here so that the item and // listeners are added together atomically synchronized (listenerEvents) { listenerEvents.add(item); listenerEvents.add(listeners); } } }

