alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (SerializingExecutor.java)

This example Java source code file (SerializingExecutor.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

arraydeque, error, exception, executor, guardedby, gwtincompatible, log, logger, logging, object, override, queueworker, runnable, runtimeexception, serializingexecutor, threading, threads, util

The SerializingExecutor.java Java example source code

/*
 * Copyright (C) 2008 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.concurrent.GuardedBy;

/**
 * Executor ensuring that all Runnables submitted are executed in order, using the provided
 * Executor, and serially such that no two will ever be running at the same time.
 *
 * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
 *
 * <p>Tasks can also be prepended to the queue to be executed in LIFO order before any other
 * submitted tasks. Primarily intended for the currently executing task to be able to schedule a
 * continuation task.
 *
 * <p>Execution on the queue can be {@linkplain #suspend suspended}, e.g. while waiting for an RPC,
 * and execution can be {@linkplain #resume resumed} later.
 *
 * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue and
 * execution has not been suspended. (Even if one task is {@linkplain Thread#interrupt interrupted},
 * execution of subsequent tasks continues.) {@code RuntimeException}s thrown by tasks are simply
 * logged and the executor keeps trucking. If an {@code Error} is thrown, the error will propagate
 * and execution will stop until it is restarted by external calls.
 */
@GwtIncompatible
final class SerializingExecutor implements Executor {
  private static final Logger log = Logger.getLogger(SerializingExecutor.class.getName());

  /** Underlying executor that all submitted Runnable objects are run on. */
  private final Executor executor;

  @GuardedBy("internalLock")
  private final Deque<Runnable> queue = new ArrayDeque();

  @GuardedBy("internalLock")
  private boolean isWorkerRunning = false;

  @GuardedBy("internalLock")
  private int suspensions = 0;

  private final Object internalLock = new Object();

  public SerializingExecutor(Executor executor) {
    this.executor = Preconditions.checkNotNull(executor);
  }

  /**
   * Adds a task to the queue and makes sure a worker thread is running, unless the queue has been
   * suspended.
   *
   * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
   * execution of tasks will stop until a call to this method or to {@link #resume()} is made.
   */
  public void execute(Runnable task) {
    synchronized (internalLock) {
      queue.add(task);
    }
    startQueueWorker();
  }

  /**
   * Prepends a task to the front of the queue and makes sure a worker thread is running, unless the
   * queue has been suspended.
   */
  public void executeFirst(Runnable task) {
    synchronized (internalLock) {
      queue.addFirst(task);
    }
    startQueueWorker();
  }

  /**
   * Suspends the running of tasks until {@link #resume()} is called. This can be called multiple
   * times to increase the suspensions count and execution will not continue until {@link #resume}
   * has been called the same number of times as {@code suspend} has been.
   *
   * <p>Any task that has already been pulled off the queue for execution will be completed before
   * execution is suspended.
   */
  public void suspend() {
    synchronized (internalLock) {
      suspensions++;
    }
  }

  /**
   * Continue execution of tasks after a call to {@link #suspend()}. More accurately, decreases the
   * suspension counter, as has been incremented by calls to {@link #suspend}, and resumes execution
   * if the suspension counter is zero.
   *
   * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
   * execution of tasks will stop until a call to this method or to {@link #execute(Runnable)} or
   * {@link #executeFirst(Runnable)} is made.
   *
   * @throws java.lang.IllegalStateException if this executor is not suspended.
   */
  public void resume() {
    synchronized (internalLock) {
      Preconditions.checkState(suspensions > 0);
      suspensions--;
    }
    startQueueWorker();
  }

  private void startQueueWorker() {
    synchronized (internalLock) {
      // We sometimes try to start a queue worker without knowing if there is any work to do.
      if (queue.peek() == null) {
        return;
      }
      if (suspensions > 0) {
        return;
      }
      if (isWorkerRunning) {
        return;
      }
      isWorkerRunning = true;
    }
    boolean executionRejected = true;
    try {
      executor.execute(new QueueWorker());
      executionRejected = false;
    } finally {
      if (executionRejected) {
        // The best we can do is to stop executing the queue, but reset the state so that
        // execution can be resumed later if the caller so wishes.
        synchronized (internalLock) {
          isWorkerRunning = false;
        }
      }
    }
  }

  /**
   * Worker that runs tasks off the queue until it is empty or the queue is suspended.
   */
  private final class QueueWorker implements Runnable {
    @Override
    public void run() {
      try {
        workOnQueue();
      } catch (Error e) {
        synchronized (internalLock) {
          isWorkerRunning = false;
        }
        throw e;
        // The execution of a task has ended abnormally.
        // We could have tasks left in the queue, so should perhaps try to restart a worker,
        // but then the Error will get delayed if we are using a direct (same thread) executor.
      }
    }

    private void workOnQueue() {
      while (true) {
        Runnable task = null;
        synchronized (internalLock) {
          // TODO(user): How should we handle interrupts and shutdowns?
          if (suspensions == 0) {
            task = queue.poll();
          }
          if (task == null) {
            isWorkerRunning = false;
            return;
          }
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
        }
      }
    }
  }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java SerializingExecutor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.