alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (ExecutorCompletionService.java)

This example Scala source code file (ExecutorCompletionService.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Scala tags/keywords

abstractexecutorservice, abstractexecutorservice, blockingqueue, executorcompletionservice, future, future, futuretask, interruptedexception, nullpointerexception, nullpointerexception, object, queueingfuture, queueingfuture, runnablefuture

The Scala ExecutorCompletionService.java source code

/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package scala.actors.threadpool;
import scala.actors.threadpool.*; // for javadoc (till 6280605 is fixed)

/**
 * A {@link CompletionService} that uses a supplied {@link Executor}
 * to execute tasks.  This class arranges that submitted tasks are,
 * upon completion, placed on a queue accessible using <tt>take.
 * The class is lightweight enough to be suitable for transient use
 * when processing groups of tasks.
 *
 * <p>
 *
 * <b>Usage Examples.
 *
 * Suppose you have a set of solvers for a certain problem, each
 * returning a value of some type <tt>Result, and would like to
 * run them concurrently, processing the results of each of them that
 * return a non-null value, in some method <tt>use(Result r). You
 * could write this as:
 *
 * <pre>
 *   void solve(Executor e,
 *              Collection<Callable<Result>> solvers)
 *     throws InterruptedException, ExecutionException {
 *       CompletionService<Result> ecs
 *           = new ExecutorCompletionService<Result>(e);
 *       for (Callable<Result> s : solvers)
 *           ecs.submit(s);
 *       int n = solvers.size();
 *       for (int i = 0; i < n; ++i) {
 *           Result r = ecs.take().get();
 *           if (r != null)
 *               use(r);
 *       }
 *   }
 * </pre>
 *
 * Suppose instead that you would like to use the first non-null result
 * of the set of tasks, ignoring any that encounter exceptions,
 * and cancelling all other tasks when the first one is ready:
 *
 * <pre>
 *   void solve(Executor e,
 *              Collection<Callable<Result>> solvers)
 *     throws InterruptedException {
 *       CompletionService<Result> ecs
 *           = new ExecutorCompletionService<Result>(e);
 *       int n = solvers.size();
 *       List<Future<Result>> futures
 *           = new ArrayList<Future<Result>>(n);
 *       Result result = null;
 *       try {
 *           for (Callable<Result> s : solvers)
 *               futures.add(ecs.submit(s));
 *           for (int i = 0; i < n; ++i) {
 *               try {
 *                   Result r = ecs.take().get();
 *                   if (r != null) {
 *                       result = r;
 *                       break;
 *                   }
 *               } catch (ExecutionException ignore) {}
 *           }
 *       }
 *       finally {
 *           for (Future<Result> f : futures)
 *               f.cancel(true);
 *       }
 *
 *       if (result != null)
 *           use(result);
 *   }
 * </pre>
 */
public class ExecutorCompletionService implements CompletionService {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue completionQueue;

    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask {
        QueueingFuture(RunnableFuture task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future task;
    }

    private RunnableFuture newTaskFor(Callable task) {
        if (aes == null)
            return new FutureTask(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture newTaskFor(Runnable task, Object result) {
        if (aes == null)
            return new FutureTask(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is <tt>null
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     * normally one dedicated for use by this service. This queue is
     * treated as unbounded -- failed attempted <tt>Queue.add
     * operations for completed taskes cause them not to be
     * retrievable.
     * @throws NullPointerException if executor or completionQueue are <tt>null
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future submit(Runnable task, Object result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future take() throws InterruptedException {
        return (Future)completionQueue.take();
    }

    public Future poll() {
        return (Future)completionQueue.poll();
    }

    public Future poll(long timeout, TimeUnit unit) throws InterruptedException {
        return (Future)completionQueue.poll(timeout, unit);
    }

}

Other Scala examples (source code examples)

Here is a short list of links related to this Scala ExecutorCompletionService.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.