alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Spring Framework example source code file (ThreadPoolTaskExecutor.java)

This example Spring Framework source code file (ThreadPoolTaskExecutor.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Spring Framework tags/keywords

blockingqueue, blockingqueue, customizablethreadfactory, disposablebean, executor, executor, initializingbean, object, object, rejectedexecutionexception, taskrejectedexception, threadpoolexecutor, threadpoolexecutor, threadpooltaskexecutor

The Spring Framework ThreadPoolTaskExecutor.java source code

/*
 * Copyright 2002-2008 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.scheduling.backportconcurrent;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;

/**
 * JavaBean that allows for configuring a JSR-166 backport
 * {@link edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor} in bean
 * style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
 * properties), exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
 * This is an alternative to configuring a ThreadPoolExecutor instance directly using
 * constructor injection, with a separate {@link ConcurrentTaskExecutor} adapter wrapping it.
 *
 * <p>For any custom needs, in particular for defining a
 * {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor},
 * it is recommended to use a straight definition of the Executor instance or a
 * factory method definition that points to the JSR-166 backport
 * {@link edu.emory.mathcs.backport.java.util.concurrent.Executors} class.
 * To expose such a raw Executor as a Spring {@link org.springframework.core.task.TaskExecutor},
 * simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
 *
 * <p>NOTE: This class implements Spring's
 * {@link org.springframework.core.task.TaskExecutor} interface as well as
 * the JSR-166 {@link edu.emory.mathcs.backport.java.util.concurrent.Executor}
 * interface, with the former being the primary interface, the other just
 * serving as secondary convenience. For this reason, the exception handling
 * follows the TaskExecutor contract rather than the Executor contract, in
 * particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
 *
 * @author Juergen Hoeller
 * @since 2.0.3
 * @see org.springframework.core.task.TaskExecutor
 * @see edu.emory.mathcs.backport.java.util.concurrent.Executor
 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor
 * @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor
 * @see edu.emory.mathcs.backport.java.util.concurrent.Executors
 * @see ConcurrentTaskExecutor
 */
public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
		implements SchedulingTaskExecutor, Executor, BeanNameAware, InitializingBean, DisposableBean {

	protected final Log logger = LogFactory.getLog(getClass());

	private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private boolean allowCoreThreadTimeOut = false;

	private int queueCapacity = Integer.MAX_VALUE;

	private ThreadFactory threadFactory = this;

	private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

	private boolean waitForTasksToCompleteOnShutdown = false;

	private boolean threadNamePrefixSet = false;

	private String beanName;

	private ThreadPoolExecutor threadPoolExecutor;


	/**
	 * Set the ThreadPoolExecutor's core pool size.
	 * Default is 1.
	 * <p>This setting can be modified at runtime, for example through JMX.
	 */
	public void setCorePoolSize(int corePoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.corePoolSize = corePoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setCorePoolSize(corePoolSize);
			}
		}
	}

	/**
	 * Return the ThreadPoolExecutor's core pool size.
	 */
	public int getCorePoolSize() {
		synchronized (this.poolSizeMonitor) {
			return this.corePoolSize;
		}
	}

	/**
	 * Set the ThreadPoolExecutor's maximum pool size.
	 * Default is <code>Integer.MAX_VALUE.
	 * <p>This setting can be modified at runtime, for example through JMX.
	 */
	public void setMaxPoolSize(int maxPoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.maxPoolSize = maxPoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
			}
		}
	}

	/**
	 * Return the ThreadPoolExecutor's maximum pool size.
	 */
	public int getMaxPoolSize() {
		synchronized (this.poolSizeMonitor) {
			return this.maxPoolSize;
		}
	}

	/**
	 * Set the ThreadPoolExecutor's keep-alive seconds.
	 * Default is 60.
	 * <p>This setting can be modified at runtime, for example through JMX.
	 */
	public void setKeepAliveSeconds(int keepAliveSeconds) {
		synchronized (this.poolSizeMonitor) {
			this.keepAliveSeconds = keepAliveSeconds;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
			}
		}
	}

	/**
	 * Return the ThreadPoolExecutor's keep-alive seconds.
	 */
	public int getKeepAliveSeconds() {
		synchronized (this.poolSizeMonitor) {
			return this.keepAliveSeconds;
		}
	}

	/**
	 * Specify whether to allow core threads to time out. This enables dynamic
	 * growing and shrinking even in combination with a non-zero queue (since
	 * the max pool size will only grow once the queue is full).
	 * <p>Default is "false". Note that this feature is only available on
	 * backport-concurrent 3.0 or above (based on the code in Java 6).
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
	 */
	public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
		this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
	}

	/**
	 * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
	 * Default is <code>Integer.MAX_VALUE.
	 * <p>Any positive value will lead to a LinkedBlockingQueue instance;
	 * any other value will lead to a SynchronousQueue instance.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue
	 * @see edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue
	 */
	public void setQueueCapacity(int queueCapacity) {
		this.queueCapacity = queueCapacity;
	}

	/**
	 * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
	 * <p>Default is this executor itself (i.e. the factory that this executor
	 * inherits from). See {@link org.springframework.util.CustomizableThreadCreator}'s
	 * javadoc for available bean properties.
	 * @see #setThreadPriority
	 * @see #setDaemon
	 */
	public void setThreadFactory(ThreadFactory threadFactory) {
		this.threadFactory = (threadFactory != null ? threadFactory : this);
	}

	/**
	 * Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
	 * Default is the ThreadPoolExecutor's default abort policy.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.AbortPolicy
	 */
	public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
		this.rejectedExecutionHandler =
				(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
	}

	/**
	 * Set whether to wait for scheduled tasks to complete on shutdown.
	 * <p>Default is "false". Switch this to "true" if you prefer
	 * fully completed tasks at the expense of a longer shutdown phase.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdown()
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdownNow()
	 */
	public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
		this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
	}

	public void setThreadNamePrefix(String threadNamePrefix) {
		super.setThreadNamePrefix(threadNamePrefix);
		this.threadNamePrefixSet = true;
	}

	public void setBeanName(String name) {
		this.beanName = name;
	}


	/**
	 * Calls <code>initialize() after the container applied all property values.
	 * @see #initialize()
	 */
	public void afterPropertiesSet() {
		initialize();
	}

	/**
	 * Creates the BlockingQueue and the ThreadPoolExecutor.
	 * @see #createQueue
	 */
	public void initialize() {
		if (logger.isInfoEnabled()) {
			logger.info("Initializing ThreadPoolExecutor" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
		}
		if (!this.threadNamePrefixSet && this.beanName != null) {
			setThreadNamePrefix(this.beanName + "-");
		}
		BlockingQueue queue = createQueue(this.queueCapacity);
		this.threadPoolExecutor = new ThreadPoolExecutor(
				this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
				queue, this.threadFactory, this.rejectedExecutionHandler);
		if (this.allowCoreThreadTimeOut) {
			this.threadPoolExecutor.allowCoreThreadTimeOut(true);
		}
	}

	/**
	 * Create the BlockingQueue to use for the ThreadPoolExecutor.
	 * <p>A LinkedBlockingQueue instance will be created for a positive
	 * capacity value; a SynchronousQueue else.
	 * @param queueCapacity the specified queue capacity
	 * @return the BlockingQueue instance
	 * @see edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue
	 * @see edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue
	 */
	protected BlockingQueue createQueue(int queueCapacity) {
		if (queueCapacity > 0) {
			return new LinkedBlockingQueue(queueCapacity);
		}
		else {
			return new SynchronousQueue();
		}
	}

	/**
	 * Return the underlying ThreadPoolExecutor for native access.
	 * @return the underlying ThreadPoolExecutor (never <code>null)
	 * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
	 */
	public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
		Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
		return this.threadPoolExecutor;
	}


	/**
	 * Implementation of both the JSR-166 backport Executor interface and the Spring
	 * TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.Executor#execute(Runnable)
	 * @see org.springframework.core.task.TaskExecutor#execute(Runnable)
	 */
	public void execute(Runnable task) {
		Executor executor = getThreadPoolExecutor();
		try {
			executor.execute(task);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}

	/**
	 * This task executor prefers short-lived work units.
	 */
	public boolean prefersShortLivedTasks() {
		return true;
	}


	/**
	 * Return the current pool size.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#getPoolSize()
	 */
	public int getPoolSize() {
		return getThreadPoolExecutor().getPoolSize();
	}

	/**
	 * Return the number of currently active threads.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#getActiveCount()
	 */
	public int getActiveCount() {
		return getThreadPoolExecutor().getActiveCount();
	}


	/**
	 * Calls <code>shutdown when the BeanFactory destroys
	 * the task executor instance.
	 * @see #shutdown()
	 */
	public void destroy() {
		shutdown();
	}

	/**
	 * Perform a shutdown on the ThreadPoolExecutor.
	 * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdown()
	 */
	public void shutdown() {
		if (logger.isInfoEnabled()) {
			logger.info("Shutting down ThreadPoolExecutor" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
		}
		if (this.waitForTasksToCompleteOnShutdown) {
			this.threadPoolExecutor.shutdown();
		}
		else {
			this.threadPoolExecutor.shutdownNow();
		}
	}

}

Other Spring Framework examples (source code examples)

Here is a short list of links related to this Spring Framework ThreadPoolTaskExecutor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.