alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (ThreadPoolImpl.java)

This example Java source code file (ThreadPoolImpl.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

atomicinteger, atomiclong, idle, interruptedexception, list, longmonitoredattributebase, monitoredobject, object, security, string, threadpoolimpl, throwable, util, workerthread, workqueue, workqueueimpl

The ThreadPoolImpl.java Java example source code

/*
 * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package com.sun.corba.se.impl.orbutil.threadpool;

import java.io.IOException;
import java.io.Closeable;

import java.security.AccessController;
import java.security.PrivilegedAction;

import java.util.List;
import java.util.ArrayList;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;

import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;

import com.sun.corba.se.spi.monitoring.MonitoringConstants;
import com.sun.corba.se.spi.monitoring.MonitoredObject;
import com.sun.corba.se.spi.monitoring.MonitoringFactories;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;

import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.spi.logging.CORBALogDomains;

public class ThreadPoolImpl implements ThreadPool
{
    // serial counter useful for debugging
    private static AtomicInteger threadCounter = new AtomicInteger(0);
    private static final ORBUtilSystemException wrapper =
        ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);


    // Any time currentThreadCount and/or availableWorkerThreads is updated
    // or accessed this ThreadPool's WorkQueue must be locked. And, it is
    // expected that this ThreadPool's WorkQueue is the only object that
    // updates and accesses these values directly and indirectly though a
    // call to a method in this ThreadPool. If any call to update or access
    // those values must synchronized on this ThreadPool's WorkQueue.
    private WorkQueue workQueue;

    // Stores the number of available worker threads
    private int availableWorkerThreads = 0;

    // Stores the number of threads in the threadpool currently
    private int currentThreadCount = 0;

    // Minimum number of worker threads created at instantiation of the threadpool
    private int minWorkerThreads = 0;

    // Maximum number of worker threads in the threadpool
    private int maxWorkerThreads = 0;

    // Inactivity timeout value for worker threads to exit and stop running
    private long inactivityTimeout;

    // Indicates if the threadpool is bounded or unbounded
    private boolean boundedThreadPool = false;

    // Running count of the work items processed
    // Set the value to 1 so that divide by zero is avoided in
    // averageWorkCompletionTime()
    private AtomicLong processedCount = new AtomicLong(1);

    // Running aggregate of the time taken in millis to execute work items
    // processed by the threads in the threadpool
    private AtomicLong totalTimeTaken = new AtomicLong(0);

    // Name of the ThreadPool
    private String name;

    // MonitoredObject for ThreadPool
    private MonitoredObject threadpoolMonitoredObject;

    // ThreadGroup in which threads should be created
    private ThreadGroup threadGroup;

    Object workersLock = new Object();
    List<WorkerThread> workers = new ArrayList<>();

    /**
     * This constructor is used to create an unbounded threadpool
     */
    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
        inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
        maxWorkerThreads = Integer.MAX_VALUE;
        workQueue = new WorkQueueImpl(this);
        threadGroup = tg;
        name = threadpoolName;
        initializeMonitoring();
    }

    /**
     * This constructor is used to create an unbounded threadpool
     * in the ThreadGroup of the current thread
     */
    public ThreadPoolImpl(String threadpoolName) {
        this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
    }

    /**
     * This constructor is used to create bounded threadpool
     */
    public ThreadPoolImpl(int minSize, int maxSize, long timeout,
                                            String threadpoolName)
    {
        minWorkerThreads = minSize;
        maxWorkerThreads = maxSize;
        inactivityTimeout = timeout;
        boundedThreadPool = true;
        workQueue = new WorkQueueImpl(this);
        name = threadpoolName;
        for (int i = 0; i < minWorkerThreads; i++) {
            createWorkerThread();
        }
        initializeMonitoring();
    }

    // Note that this method should not return until AFTER all threads have died.
    public void close() throws IOException {

        // Copy to avoid concurrent modification problems.
        List<WorkerThread> copy = null;
        synchronized (workersLock) {
            copy = new ArrayList<>(workers);
        }

        for (WorkerThread wt : copy) {
            wt.close();
            while (wt.getState() != Thread.State.TERMINATED) {
                try {
                    wt.join();
                } catch (InterruptedException exc) {
                    wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
                }
            }
        }

        threadGroup = null;
    }


    // Setup monitoring for this threadpool
    private void initializeMonitoring() {
        // Get root monitored object
        MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
                createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
                getRootMonitoredObject();

        // Create the threadpool monitoring root
        MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
                    MonitoringConstants.THREADPOOL_MONITORING_ROOT);
        if (threadPoolMonitoringObjectRoot == null) {
            threadPoolMonitoringObjectRoot =  MonitoringFactories.
                    getMonitoredObjectFactory().createMonitoredObject(
                    MonitoringConstants.THREADPOOL_MONITORING_ROOT,
                    MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
            root.addChild(threadPoolMonitoringObjectRoot);
        }
        threadpoolMonitoredObject = MonitoringFactories.
                    getMonitoredObjectFactory().
                    createMonitoredObject(name,
                    MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);

        threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);

        LongMonitoredAttributeBase b1 = new
            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
                    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
                public Object getValue() {
                    return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
                }
            };
        threadpoolMonitoredObject.addAttribute(b1);
        LongMonitoredAttributeBase b2 = new
            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
                    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
                public Object getValue() {
                    return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
                }
            };
        threadpoolMonitoredObject.addAttribute(b2);
        LongMonitoredAttributeBase b3 = new
            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
                    MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
                public Object getValue() {
                    return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
                }
            };
        threadpoolMonitoredObject.addAttribute(b3);
        LongMonitoredAttributeBase b4 = new
            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
                    MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
                public Object getValue() {
                    return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
                }
            };
        threadpoolMonitoredObject.addAttribute(b4);
        LongMonitoredAttributeBase b5 = new
            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
                    MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
                public Object getValue() {
                    return new Long(ThreadPoolImpl.this.currentProcessedCount());
                }
            };
        threadpoolMonitoredObject.addAttribute(b5);

        // Add the monitored object for the WorkQueue

        threadpoolMonitoredObject.addChild(
                ((WorkQueueImpl)workQueue).getMonitoredObject());
    }

    // Package private method to get the monitored object for this
    // class
    MonitoredObject getMonitoredObject() {
        return threadpoolMonitoredObject;
    }

    public WorkQueue getAnyWorkQueue()
    {
        return workQueue;
    }

    public WorkQueue getWorkQueue(int queueId)
        throws NoSuchWorkQueueException
    {
        if (queueId != 0)
            throw new NoSuchWorkQueueException();
        return workQueue;
    }

    /**
     * To be called from the workqueue when work is added to the
     * workQueue. This method would create new threads if required
     * or notify waiting threads on the queue for available work
     */
    void notifyForAvailableWork(WorkQueue aWorkQueue) {
        synchronized (aWorkQueue) {
            if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
                createWorkerThread();
            } else {
                aWorkQueue.notify();
            }
        }
    }


    private Thread createWorkerThreadHelper( String name ) {
        // Thread creation needs to be in a doPrivileged block
        // if there is a non-null security manager for two reasons:
        // 1. The creation of a thread in a specific ThreadGroup
        //    is a privileged operation.  Lack of a doPrivileged
        //    block here causes an AccessControlException
        //    (see bug 6268145).
        // 2. We want to make sure that the permissions associated
        //    with this thread do NOT include the permissions of
        //    the current thread that is calling this method.
        //    This leads to problems in the app server where
        //    some threads in the ThreadPool randomly get
        //    bad permissions, leading to unpredictable
        //    permission errors (see bug 6021011).
        //
        //    A Java thread contains a stack of call frames,
        //    one for each method called that has not yet returned.
        //    Each method comes from a particular class.  The class
        //    was loaded by a ClassLoader which has an associated
        //    CodeSource, and this determines the Permissions
        //    for all methods in that class.  The current
        //    Permissions for the thread are the intersection of
        //    all Permissions for the methods on the stack.
        //    This is part of the Security Context of the thread.
        //
        //    When a thread creates a new thread, the new thread
        //    inherits the security context of the old thread.
        //    This is bad in a ThreadPool, because different
        //    creators of threads may have different security contexts.
        //    This leads to occasional unpredictable errors when
        //    a thread is re-used in a different security context.
        //
        //    Avoiding this problem is simple: just do the thread
        //    creation in a doPrivileged block.  This sets the
        //    inherited security context to that of the code source
        //    for the ORB code itself, which contains all permissions
        //    in either Java SE or Java EE.
        WorkerThread thread = new WorkerThread(threadGroup, name);
        synchronized (workersLock) {
            workers.add(thread);
        }

        // The thread must be set to a daemon thread so the
        // VM can exit if the only threads left are PooledThreads
        // or other daemons.  We don't want to rely on the
        // calling thread always being a daemon.
        // Note that no exception is possible here since we
        // are inside the doPrivileged block.
        thread.setDaemon(true);

        wrapper.workerThreadCreated(thread, thread.getContextClassLoader());

        thread.start();
        return null;
    }


    /**
     * To be called from the workqueue to create worker threads when none
     * available.
     */
    void createWorkerThread() {
        final String name = getName();
        synchronized (workQueue) {
            try {
                if (System.getSecurityManager() == null) {
                    createWorkerThreadHelper(name);
                } else {
                    // If we get here, we need to create a thread.
                    AccessController.doPrivileged(
                            new PrivilegedAction() {
                        public Object run() {
                            return createWorkerThreadHelper(name);
                        }
                    }
                    );
                }
            } catch (Throwable t) {
                // Decrementing the count of current worker threads.
                // But, it will be increased in the finally block.
                decrementCurrentNumberOfThreads();
                wrapper.workerThreadCreationFailure(t);
            } finally {
                incrementCurrentNumberOfThreads();
            }
        }
    }

    public int minimumNumberOfThreads() {
        return minWorkerThreads;
    }

    public int maximumNumberOfThreads() {
        return maxWorkerThreads;
    }

    public long idleTimeoutForThreads() {
        return inactivityTimeout;
    }

    public int currentNumberOfThreads() {
        synchronized (workQueue) {
            return currentThreadCount;
        }
    }

    void decrementCurrentNumberOfThreads() {
        synchronized (workQueue) {
            currentThreadCount--;
        }
    }

    void incrementCurrentNumberOfThreads() {
        synchronized (workQueue) {
            currentThreadCount++;
        }
    }

    public int numberOfAvailableThreads() {
        synchronized (workQueue) {
            return availableWorkerThreads;
        }
    }

    public int numberOfBusyThreads() {
        synchronized (workQueue) {
            return (currentThreadCount - availableWorkerThreads);
        }
    }

    public long averageWorkCompletionTime() {
        synchronized (workQueue) {
            return (totalTimeTaken.get() / processedCount.get());
        }
    }

    public long currentProcessedCount() {
        synchronized (workQueue) {
            return processedCount.get();
        }
    }

    public String getName() {
        return name;
    }

    /**
    * This method will return the number of WorkQueues serviced by the threadpool.
    */
    public int numberOfWorkQueues() {
        return 1;
    }


    private static synchronized int getUniqueThreadId() {
        return ThreadPoolImpl.threadCounter.incrementAndGet();
    }

    /**
     * This method will decrement the number of available threads
     * in the threadpool which are waiting for work. Called from
     * WorkQueueImpl.requestWork()
     */
    void decrementNumberOfAvailableThreads() {
        synchronized (workQueue) {
            availableWorkerThreads--;
        }
    }

    /**
     * This method will increment the number of available threads
     * in the threadpool which are waiting for work. Called from
     * WorkQueueImpl.requestWork()
     */
    void incrementNumberOfAvailableThreads() {
        synchronized (workQueue) {
            availableWorkerThreads++;
        }
    }


    private class WorkerThread extends Thread implements Closeable
    {
        private Work currentWork;
        private int threadId = 0; // unique id for the thread
        private volatile boolean closeCalled = false;
        private String threadPoolName;
        // name seen by Thread.getName()
        private StringBuffer workerThreadName = new StringBuffer();

        WorkerThread(ThreadGroup tg, String threadPoolName) {
            super(tg, "Idle");
            this.threadId = ThreadPoolImpl.getUniqueThreadId();
            this.threadPoolName = threadPoolName;
            setName(composeWorkerThreadName(threadPoolName, "Idle"));
        }

        public synchronized void close() {
            closeCalled = true;
            interrupt();
        }

        private void resetClassLoader() {

        }

        private void performWork() {
            long start = System.currentTimeMillis();
            try {
                currentWork.doWork();
            } catch (Throwable t) {
                wrapper.workerThreadDoWorkThrowable(this, t);
            }
            long elapsedTime = System.currentTimeMillis() - start;
            totalTimeTaken.addAndGet(elapsedTime);
            processedCount.incrementAndGet();
        }

        public void run() {
            try  {
                while (!closeCalled) {
                    try {
                        currentWork = ((WorkQueueImpl)workQueue).requestWork(
                            inactivityTimeout);
                        if (currentWork == null)
                            continue;
                    } catch (InterruptedException exc) {
                        wrapper.workQueueThreadInterrupted( exc, getName(),
                           Boolean.valueOf(closeCalled));

                        continue ;
                    } catch (Throwable t) {
                         wrapper.workerThreadThrowableFromRequestWork(this, t,
                                workQueue.getName());

                        continue;
                    }

                    performWork();

                    // set currentWork to null so that the work item can be
                    // garbage collected without waiting for the next work item.
                    currentWork = null;

                    resetClassLoader();
                }
            } catch (Throwable e) {
                // This should not be possible
                wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
            } finally {
                synchronized (workersLock) {
                    workers.remove(this);
                }
            }
        }

        private String composeWorkerThreadName(String poolName, String workerName) {
            workerThreadName.setLength(0);
            workerThreadName.append("p: ").append(poolName);
            workerThreadName.append("; w: ").append(workerName);
            return workerThreadName.toString();
        }
    } // End of WorkerThread class

}

// End of file.

Other Java examples (source code examples)

Here is a short list of links related to this Java ThreadPoolImpl.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.