alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Groovy example source code file (ActorTest.groovy)

This example Groovy source code file (ActorTest.groovy) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Groovy tags/keywords

arraylist, atomicinteger, closure, closure, longoperation, longoperation, object, object, reentrantlock, string, thread, threading, threads, weakreference, workerthread, workerthread

The Groovy ActorTest.groovy source code

/*
 * Copyright 2003-2009 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package groovy

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.LinkedBlockingQueue
import java.lang.ref.WeakReference

class ActorTest extends GroovyTestCase {
    void testSync () {
        new FibCalculator().calcFibSync(15)
    }

    void testAsync () {
        ReentrantLock.metaClass {
            withLock { Closure c ->
                lock()
                try {
                    c.call ()
                }
                finally {
                    unlock()
                }
            }
        }

        WorkerThread.metaClass.mixin ReentrantLock
        Thread.metaClass.mixin WorkerThread

        WorkerThread.startPool(15)

        Thread thread = Thread.currentThread()
        thread.name = "Main App Thread"
        thread.registerWorker ()
        new FibCalculator().calcFib(18)

        Object.metaClass = null
        ReentrantLock.metaClass = null
        WorkerThread.metaClass = null
    }
}

class WorkerThread {
    static {
        Object.metaClass {
            longOperation{ Closure c ->
                new LongOperation (c)
            }

            send{ Closure c ->
                new LongOperation (c).send ()
            }

            post{ Closure c ->
                new LongOperation (c).post ()
            }
        }
    }

    def list = new LinkedList ()

    private final static ReentrantLock globalLock = new ReentrantLock()
    private final static LinkedBlockingQueue globalQueue = new LinkedBlockingQueue ()
    private final static ArrayList allWorkers = new ArrayList ()
    private final static Random r = new Random();

    WorkerThread () {
    }

    void registerWorker() {
        globalLock.withLock {
            WeakReference ref = new WeakReference(delegate)
            for (int i = 0; i != allWorkers.size(); ++i) {
                WeakReference or = allWorkers[i]
                if (or == null || or.get() == null) {
                    allWorkers[i] = ref
                    return ref
                }
            }

            allWorkers << ref
        }
    }

    void schedule (Closure action) {
        withLock {
            list.addFirst action
        }
    }

    void post (Closure action) {
        globalQueue.offer action
    }

    Object[] send (List actions) {
        int len = actions.size()
        AtomicInteger counter = new AtomicInteger()
        def results = new Object[len]
        actions.eachWithIndex {Closure action, int index ->
            schedule {
                Object res = action.call()
                results[index] = res
                counter.incrementAndGet()
            }
        }

        while (counter.get() != len) {
            execute()
        }
        return results
    }

    void execute () {
        Closure action = nextTask ()

        if (action)
          action ()
    }

    Closure nextTask() {
        Closure action = withLock {
            return list.isEmpty() ? null : (Closure)list.removeFirst()
        }

        if (action != null)
          return action

        action = globalQueue.poll()
        if (action != null)
          return action

        globalLock.withLock {
           int len = allWorkers.size()
           if (len == 1)
             return null

           int from = r.nextInt(len)
           for (int i = from+1; i != from; ++i) {
               if (i == len) {
                 i = 0
                 if (from == 0) {
                     return null
                 }
               }

               def worker = allWorkers[i]
               if (worker) {
                   worker = worker.get ()
                   if (!worker) {
                     allWorkers [i] = null
                     continue
                   }

                   action = worker.withLock {
                       if (!worker.list.isEmpty()) {
                            worker.list.removeLast ()
                       }
                   }

                   if (action)
                     return action
               }
           }

           return null
        }
    }

    static void startPool(int count) {
        for (int i = 0; i != count; ++i) {
            def n = i
            Thread.start {
                Thread thread = Thread.currentThread()
                thread.registerWorker ()
                thread.name = "WorkerPool-" + n
                try {
                    while(true) {
                        thread.execute ()
                    }
                }
                finally {
                    return 
                }
            }
        }
    }
}

class FibCalculator {
    def calcFib (value) {
        def a = calcFibImpl (value)
        String calc = Thread.currentThread().name
        post {
            println "fib(${value})=$a $calc ${Thread.currentThread().name}"
        }
        Thread.sleep(1)
        a
    }

    def calcFibImpl (value) {
        if (value <= 0)
           0
        else {
           if (value <= 2) {
               1
           }
           else
               longOperation {
                   calcFib (value-1)
               }
               .and {
                   calcFib (value-2)
               }
               .send { Object [] it ->
                   it [0] + it [1]
               }
        }
    }

    def calcFibSync (value) {
        def a = calcFibSyncImpl (value)
        String calc = Thread.currentThread().name
        println "fib(${value})=$a $calc ${Thread.currentThread().name}"
        Thread.sleep(1)
        a
    }


    def calcFibSyncImpl (value) {
        if (value <= 0)
           0
        else {
           if (value <= 2) {
               1
           }
           else
             calcFibSync(value-1) + calcFibSync(value-2)
        }
    }
}

class LongOperation {
    def actions = []

    LongOperation (Closure action) {
        actions << action
    }

    LongOperation and (Closure c) {
        actions << c
        this
    }

    def send (Closure op) {
        op.call (send())
    }

    Object[] send () {
        Thread.currentThread().send (actions)
    }

    void post () {
        actions.each { Closure action ->
           Thread.currentThread().post(action)
        }
    }
}

Other Groovy examples (source code examples)

Here is a short list of links related to this Groovy ActorTest.groovy source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.