|
What this is
Other links
The source code/* * Sun Public License Notice * * The contents of this file are subject to the Sun Public License * Version 1.0 (the "License"). You may not use this file except in * compliance with the License. A copy of the License is available at * http://www.sun.com/ * * The Original Code is NetBeans. The Initial Developer of the Original * Code is Sun Microsystems, Inc. Portions Copyright 1997-2003 Sun * Microsystems, Inc. All Rights Reserved. */ package org.openide.util; import java.lang.ref.*; import java.util.*; import org.openide.ErrorManager; /** Request processor that is capable to execute requests in dedicated threads. * You can create your own instance or use the shared one. * *
RequestProcessor instance with limited throughput (probably
* set to 1), the IDE would try to run all your requests in parallel otherwise.
*
* @author Petr Nejedly, Jaroslav Tulach
*/
public final class RequestProcessor {
/** the static instance for users that do not want to have own processor */
private static RequestProcessor DEFAULT = new RequestProcessor ();
// 50: a conservative value, just for case of misuse
/** the static instance for users that do not want to have own processor */
private static RequestProcessor UNLIMITED = new RequestProcessor ("Default RequestProcessor", 50); // NOI18N
/** A shared timer used to pass timeouted tasks to pending queue */
private static Timer starterThread = new Timer(true);
/** logger */
private static ErrorManager logger;
/** The name of the RequestProcessor instance */
String name;
/** The counter for automatic naming of unnamed RequestProcessors */
private static int counter = 0;
/** If the RP was stopped, this variable will be set, every new post()
* will throw an exception and no task will be processed any further */
boolean stopped = false;
/** The lock covering following four fields. They should be accessed
* only while having this lock held. */
private Object processorLock = new Object();
/** The set holding all the Processors assigned to this RequestProcessor */
private HashSet processors = new HashSet();
/** Actualy the first item is pending to be processed.
* Can be accessed/trusted only under the above processorLock lock.
* If null, nothing is scheduled and the processor is not running. */
private List queue = new LinkedList();
/** Number of currently running processors. If there is a new request
* and this number is lower that the throughput, new Processor is asked
* to carry over the request. */
private int running = 0;
/** The maximal number of processors that can perform the requests sent
* to this RequestProcessors. If 1, all the requests are serialized. */
private int throughput;
/** Creates new RequestProcessor with automatically assigned unique name. */
public RequestProcessor() {
this(null,1);
}
/** Creates a new named RequestProcessor with throughput 1.
* @param name the name to use for the request processor thread */
public RequestProcessor(String name) {
this(name,1);
}
/** Creates a new named RequestProcessor with defined throughput.
* @param name the name to use for the request processor thread
* @param throughput the maximal count of requests allowed to run in parallel
*
* @since OpenAPI version 2.12
*/
public RequestProcessor(String name, int throughput) {
this.throughput = throughput;
this.name = (name != null) ? name : "OpenIDE-request-processor-" + (counter++);
}
/** The getter for the shared instance of the RequestProcessor .
*
* @return an instance of RequestProcessor that is capable of performing
* "unlimited" (currently limited to 50, just for case of misuse) number
* of requests in parallel. This instance is shared by anybody who
* needs a way of performing sporadic or repeated asynchronous work.
*
* @since OpenAPI version 2.12
*/
public static RequestProcessor getDefault() {
return UNLIMITED;
}
/** This methods asks the request processor to start given
* runnable immediately. The default priority is {@link Thread#MIN_PRIORITY}.
*
* @param run class to run
* @return the task to control the request
*/
public Task post (Runnable run) {
return post (run, 0, Thread.MIN_PRIORITY);
}
/** This methods asks the request processor to start given
* runnable after timeToWait milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
*
* @param run class to run
* @param timeToWait to wait before execution
* @return the task to control the request
*/
public Task post (final Runnable run, int timeToWait) {
return post (run, timeToWait, Thread.MIN_PRIORITY);
}
/** This methods asks the request processor to start given
* runnable after timeToWait milliseconds. Given priority is assigned to the
* request. * For request relaying please consider: * * post(run, timeToWait, Thread.currentThread().getPriority()); ** * @param run class to run * @param timeToWait to wait before execution * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY} * @return the task to control the request */ public Task post (final Runnable run, int timeToWait, int priority) { RequestProcessor.Task task = new Task (run, priority); task.schedule (timeToWait); return task; } /** Creates request that can be later started by setting its delay. * The request is not immediatelly put into the queue. It is planned after * setting its delay by schedule method. * * @param run action to run in the process * @return the task to control execution of given action */ public Task create (Runnable run) { return new Task (run); } /** Tests if the current thread is request processor thread. * This method could be used to prevent the deadlocks using * waitFinished method. Any two tasks created
* by request processor must not wait for themself.
*
* @return true if the current thread is request processor
* thread, otherwise false
*/
public boolean isRequestProcessorThread () {
Thread c = Thread.currentThread();
// return c instanceof Processor && ((Processor)c).source == this;
synchronized(processorLock) {
return processors.contains(c);
}
}
/** Stops processing of runnables processor.
* The currently running runnable is finished and no new is started.
*/
public void stop () {
if (this == UNLIMITED || this == DEFAULT) {
throw new IllegalArgumentException("Can't stop shared RP's"); // NOI18N
}
synchronized (processorLock) {
stopped = true;
Iterator it = processors.iterator();
while (it.hasNext()) ((Processor)it.next()).interrupt();
}
}
//
// Static methods communicating with default request processor
//
/** This methods asks the request processor to start given
* runnable after timeToWait milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
*
* @param run class to run
* @return the task to control the request
*
* @deprecated Sharing of one singlethreaded RequestProcessor
* among different users and posting even blocing requests is inherently
* deadlock-prone. See use cases. */
public static Task postRequest (Runnable run) {
return DEFAULT.post (run);
}
/** This methods asks the request processor to start given
* runnable after timeToWait milliseconds.
* The default priority is {@link Thread#MIN_PRIORITY}.
*
* @param run class to run
* @param timeToWait to wait before execution
* @return the task to control the request
*
* @deprecated Sharing of one singlethreaded RequestProcessor
* among different users and posting even blocing requests is inherently
* deadlock-prone. See use cases. */
public static Task postRequest (final Runnable run, int timeToWait) {
return DEFAULT.post (run, timeToWait);
}
/** This methods asks the request processor to start given
* runnable after timeToWait milliseconds. Given priority is assigned to the
* request.
* @param run class to run
* @param timeToWait to wait before execution
* @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
* @return the task to control the request
*
* @deprecated Sharing of one singlethreaded RequestProcessor
* among different users and posting even blocing requests is inherently
* deadlock-prone. See use cases. */
public static Task postRequest (final Runnable run, int timeToWait, int priority) {
return DEFAULT.post (run, timeToWait, priority);
}
/** Creates request that can be later started by setting its delay.
* The request is not immediatelly put into the queue. It is planned after
* setting its delay by setDelay method.
* @param run action to run in the process
* @return the task to control execution of given action
*
* @deprecated Sharing of one singlethreaded RequestProcessor
* among different users and posting even blocing requests is inherently
* deadlock-prone. See use cases. */
public static Task createRequest (Runnable run) {
return DEFAULT.create (run);
}
/** Logger for the error manager.
*/
private static ErrorManager logger () {
synchronized (starterThread) {
if (logger == null) {
logger = ErrorManager.getDefault ().getInstance ("org.openide.util.RequestProcessor"); // NOI18N
}
return logger;
}
}
/**
* The task describing the request sent to the processor.
* Cancellable since 4.1.
*/
public final class Task extends org.openide.util.Task implements Cancellable {
private Item item;
private int priority = Thread.MIN_PRIORITY;
private long time = 0;
private Thread lastThread = null;
/** @param run runnable to start
* @param delay amount of millis to wait
* @param priority the priorty of the task
*/
Task (Runnable run) {
super(run);
}
Task (Runnable run, int priority){
super(run);
if (priority < Thread.MIN_PRIORITY) priority = Thread.MIN_PRIORITY;
if (priority > Thread.MAX_PRIORITY) priority = Thread.MAX_PRIORITY;
this.priority = priority;
}
public void run() {
lastThread = Thread.currentThread();
super.run();
lastThread = null;
}
/** Getter for amount of millis till this task
* is started.
* @return amount of millis
*/
public int getDelay () {
long delay = time - System.currentTimeMillis ();
if (delay < 0L) return 0;
if (delay > (long)Integer.MAX_VALUE) return Integer.MAX_VALUE;
return (int)delay;
}
/** (Re-)schedules a task to run in the future.
* If the task has not been run yet, it is postponed to
* the new time. If it has already run and finished, it is scheduled
* to be started again. If it is currently running, it is nevertheless
* left to finish, and also scheduled to run again.
* @param delay time in milliseconds to wait (starting from now)
*/
public void schedule (int delay) {
if (stopped) throw new IllegalStateException("RequestProcessor already stopped!"); // NOI18N
time = System.currentTimeMillis() + delay;
final Item localItem;
synchronized (processorLock) {
notifyRunning();
if (item != null) item.clear();
item = new Item(this, RequestProcessor.this);
localItem = item;
}
if (delay == 0) { // Place it to pending queue immediatelly
enqueue(localItem);
} else { // Post the starter
starterThread.schedule( new TimerTask() {
public void run() { // enqueue the created
// guard code for #33302. If it ever happens again
// we'll have more info
try {
enqueue(localItem); // it may be already neutralized
} catch (RuntimeException e) {
ErrorManager.getDefault().notify(e);
}
}
}, delay);
}
}
/** Removes the task from the queue.
*
* @return true if the task has been removed from the queue,
* false it the task has already been processed
*/
public boolean cancel () {
synchronized (processorLock) {
boolean success = (item == null) ? false : item.clear();
if (success) notifyFinished(); // mark it as finished
return success;
}
}
/** Current priority of the task.
*/
public int getPriority () {
return priority;
}
/** Changes the priority the task will be performed with. */
public void setPriority (int priority) {
if (this.priority == priority) return;
if (priority < Thread.MIN_PRIORITY) priority = Thread.MIN_PRIORITY;
if (priority > Thread.MAX_PRIORITY) priority = Thread.MAX_PRIORITY;
this.priority = priority;
// update queue position accordingly
synchronized (processorLock) {
if (item == null) return;
if (queue.remove(item)) {
prioritizedEnqueue(item);
}
}
}
/** This method is an implementation of the waitFinished method
* in the RequestProcessor.Task. It check the current thread if it is
* request processor thread and in such case runs the task immediatelly
* to prevent deadlocks.
*/
public void waitFinished () {
if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName());
boolean toRun;
synchronized (processorLock) {
// correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
// the same: toRun = !isFinished () && (item == null ? true : item.clear ());
toRun = !isFinished () && (item == null || item.clear ());
}
if (toRun) { //System.err.println(" ## running it synchronously");
run();
} else { // it is already running in other thread of this RP
if (lastThread != Thread.currentThread()) {
super.waitFinished ();
}
// else {
//System.err.println("Thread waiting for itself!!!!! - semantics broken!!!");
//Thread.dumpStack();
// }
}
} else {
super.waitFinished ();
}
}
public String toString () {
return "RequestProcessor.Task [" + name + ", " + priority + "] for " + super.toString(); // NOI18N
}
}
//------------------------------------------------------------------------------
// The pending queue management implementation
//------------------------------------------------------------------------------
/** Place the Task to the queue of pending tasks for immediate processing.
* If there is no other Task planned, this task is immediatelly processed
* in the Processor.
*/
void enqueue (Item item) {
synchronized (processorLock) {
if (item.getTask() == null) return;
prioritizedEnqueue(item);
if (running < throughput) {
running++;
Processor proc = Processor.get();
processors.add(proc);
proc.setName(name);
proc.attachTo(this);
}
}
}
// call it under queue lock i.e. processorLock
private void prioritizedEnqueue(Item item) {
int iprio = item.getPriority();
if (queue.isEmpty()) {
queue.add(item);
item.enqueued = true;
return;
} else if (iprio <= ((Item)queue.get(queue.size()-1)).getPriority()) {
queue.add(item);
item.enqueued = true;
} else {
for (ListIterator it = queue.listIterator(); it.hasNext();) {
Item next = (Item) it.next();
if (iprio > next.getPriority()) {
it.set(item);
it.add(next);
item.enqueued = true;
return;
}
}
throw new IllegalStateException("Prioritized enqueue failed!");
}
}
Task askForWork (Processor worker, String debug) {
synchronized (processorLock) {
if (stopped || queue.isEmpty()) { // no more work in this burst, return him
processors.remove(worker);
Processor.put(worker, debug);
running--;
return null;
} else { // we have some work for the worker, pass it
Item i = (Item) queue.remove(0);
Task t = i.getTask ();
i.clear ();
return t;
}
}
}
static final boolean SLOW = Boolean.getBoolean("org.openide.util.RequestProcessor.Item.SLOW");
/* One item representing the task pending in the pending queue */
private static class Item extends Exception {
private final RequestProcessor owner;
private Task action;
private boolean enqueued;
Item (Task task, RequestProcessor rp) {
super ("Posted StackTrace"); // NOI18N
action = task;
owner = rp;
}
Task getTask() {
return action;
}
/** Annulate this request iff still possible.
* @returns true if it was possible to skip this item, false
* if the item was/is already processed */
boolean clear() {
synchronized (owner.processorLock) {
action = null;
return enqueued ? owner.queue.remove(this) : true;
}
}
int getPriority() {
return action.getPriority();
}
public Throwable fillInStackTrace() {
return SLOW ? super.fillInStackTrace() : this;
}
}
//------------------------------------------------------------------------------
// The Processor management implementation
//------------------------------------------------------------------------------
/**
/** A special thread that processes timouted Tasks from a RequestProcessor.
* It uses the RequestProcessor as a synchronized queue (a Channel),
* so it is possible to run more Processors in paralel for one RequestProcessor
*/
private static class Processor extends Thread {
/** A stack containing all the inactive Processors */
private static Stack pool = new Stack();
/** Provide an inactive Processor instance. It will return either
* existing inactive processor from the pool or will create a new instance
* if no instance is in the pool.
*
* @return inactive Processor
*/
static Processor get() {
synchronized (pool) {
if (pool.isEmpty()) {
Processor proc = new Processor();
proc.idle = false;
proc.start();
return proc;
} else {
Processor proc = (Processor)pool.pop();
proc.idle = false;
return proc;
}
}
}
/** A way of returning a Processor to the inactive pool.
*
* @param proc the Processor to return to the pool. It shall be inactive.
* @param last the debugging string identifying the last client.
*/
static void put (Processor proc, String last) {
synchronized (pool) {
proc.setName("Inactive RequestProcessor thread [Was:" +
proc.getName() + "/" + last + "]"); // NOI18N
proc.idle = true;
pool.push(proc);
}
}
/** Internal variable holding the Runnable to be run.
* Used for passing Runnable through Thread boundaries.
*/
//private Item task;
private RequestProcessor source;
/* One minute of inactivity and the Thread will die if not assigned */
private static final int INACTIVE_TIMEOUT = 60000;
private boolean idle = true;
/** Waiting lock */
private Object lock = new Object();
public Processor () {
super (getTopLevelThreadGroup(),
"Inactive RequestProcessor thread"); // NOI18N
setDaemon (true);
}
/** setPriority wrapper that skips setting the same priority
* we'return already running at */
void setPrio(int priority) {
if (priority != getPriority()) setPriority(priority);
}
/**
* Sets an Item to be performed and notifies the performing Thread
* to start the processing.
*
* @param r the Item to run.
*/
public void attachTo (RequestProcessor src) {
synchronized (lock) {
//assert(source == null);
source = src;
lock.notify();
}
}
/**
* The method that will repeatedly wait for a request and perform it.
*/
public void run () {
for(;;) {
RequestProcessor current = null;
synchronized (lock) {
try {
if (source == null) lock.wait (INACTIVE_TIMEOUT); // wait for the job
} catch (InterruptedException e) {} // not interesting
current = source;
source = null;
if (current == null) { // We've timeouted
synchronized (pool) {
if (idle) { // and we're idle
pool.remove(this);
break; // exit the thread
} else { // this will happen if we've been just
continue; // before timeout when we were assigned
}
}
}
}
Task todo;
String debug = null;
ErrorManager em = logger();
boolean loggable = em.isLoggable(ErrorManager.INFORMATIONAL);
if (loggable) em.log (ErrorManager.INFORMATIONAL, "Begining work " + getName ()); // NOI18N
// while we have something to do
while((todo = current.askForWork(this, debug)) != null) {
// if(todo != null) {
setPrio(todo.getPriority());
try {
if (loggable) logger ().log (" Executing " + todo); // NOI18N
todo.run ();
if (loggable) logger ().log (" Execution finished in" + getName ()); // NOI18N
debug = todo.debug();
} catch (OutOfMemoryError oome) {
// direct notification, there may be no room for
// annotations and we need OOME to be processed
// for debugging hooks
ErrorManager.getDefault().notify(oome);
} catch (RuntimeException e) {
doNotify(todo, e);
} catch (LinkageError e) {
doNotify(todo, e);
} catch (StackOverflowError e) {
// Try as hard as possible to get a real stack trace
e.printStackTrace();
// recoverable too
doNotify(todo, e);
}
// to improve GC
todo = null;
// }
}
if (loggable) logger ().log (ErrorManager.INFORMATIONAL, "Work finished " + getName ()); // NOI18N
}
}
/** @see "#20467" */
private static void doNotify(RequestProcessor.Task todo, Throwable ex) {
ErrorManager err = ErrorManager.getDefault ();
err.annotate(ex, ErrorManager.EXCEPTION, null,
NbBundle.getMessage (RequestProcessor.class, "EXC_IN_REQUEST_PROCESSOR"),
SLOW ? todo.item : null, null );
err.notify(ex);
}
/**
* @return a top level ThreadGroup. The method ensures that even
* Processors created by internal execution will survive the
* end of the task.
*/
static ThreadGroup getTopLevelThreadGroup() {
java.security.PrivilegedAction run = new java.security.PrivilegedAction() {
public Object run() {
ThreadGroup current = Thread.currentThread().getThreadGroup();
while (current.getParent() != null) {
current = current.getParent();
}
return current;
}
};
return (ThreadGroup) java.security.AccessController.doPrivileged(run);
}
}
}
|
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.