|
Java example source code file (ConnPoolByRoute.java)
This example source code file (ConnPoolByRoute.java) is included in the DevDaily.com
"Java Source Code
Warehouse" project. The intent of this project is to help you "Learn
Java by Example" TM.
The ConnPoolByRoute.java example source code
/*
* ====================================================================
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl.conn.tsccm;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Queue;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.ClientConnectionOperator;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.params.ConnPerRoute;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.params.HttpParams;
/**
* A connection pool that maintains connections by route.
* This class is derived from <code>MultiThreadedHttpConnectionManager
* in HttpClient 3.x, see there for original authors. It implements the same
* algorithm for connection re-use and connection-per-host enforcement:
* <ul>
* <li>connections are re-used only for the exact same route
* <li>connection limits are enforced per route rather than per host
* </ul>
* Note that access to the pool data structures is synchronized via the
* {@link AbstractConnPool#poolLock poolLock} in the base class,
* not via <code>synchronized methods.
*
* @since 4.0
*/
public class ConnPoolByRoute extends AbstractConnPool {
private final Log log = LogFactory.getLog(getClass());
private final HttpParams params;
/** Connection operator for this pool */
protected final ClientConnectionOperator operator;
/** The list of free connections */
protected final Queue<BasicPoolEntry> freeConnections;
/** The list of WaitingThreads waiting for a connection */
protected final Queue<WaitingThread> waitingThreads;
/**
* A map of route-specific pools.
* Keys are of class {@link HttpRoute},
* values of class {@link RouteSpecificPool}.
*/
protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
/**
* Creates a new connection pool, managed by route.
*/
public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
super();
if (operator == null) {
throw new IllegalArgumentException("Connection operator may not be null");
}
this.operator = operator;
this.params = params;
freeConnections = createFreeConnQueue();
waitingThreads = createWaitingThreadQueue();
routeToPool = createRouteToPoolMap();
}
/**
* Creates the queue for {@link #freeConnections}.
* Called once by the constructor.
*
* @return a queue
*/
protected Queue<BasicPoolEntry> createFreeConnQueue() {
return new LinkedList<BasicPoolEntry>();
}
/**
* Creates the queue for {@link #waitingThreads}.
* Called once by the constructor.
*
* @return a queue
*/
protected Queue<WaitingThread> createWaitingThreadQueue() {
return new LinkedList<WaitingThread>();
}
/**
* Creates the map for {@link #routeToPool}.
* Called once by the constructor.
*
* @return a map
*/
protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
return new HashMap<HttpRoute, RouteSpecificPool>();
}
/**
* Creates a new route-specific pool.
* Called by {@link #getRoutePool} when necessary.
*
* @param route the route
*
* @return the new pool
*/
protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
ConnPerRoute connPerRoute = ConnManagerParams.getMaxConnectionsPerRoute(params);
return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
}
/**
* Creates a new waiting thread.
* Called by {@link #getRoutePool} when necessary.
*
* @param cond the condition to wait for
* @param rospl the route specific pool, or <code>null
*
* @return a waiting thread representation
*/
protected WaitingThread newWaitingThread(Condition cond,
RouteSpecificPool rospl) {
return new WaitingThread(cond, rospl);
}
/**
* Get a route-specific pool of available connections.
*
* @param route the route
* @param create whether to create the pool if it doesn't exist
*
* @return the pool for the argument route,
* never <code>null if create is true
*/
protected RouteSpecificPool getRoutePool(HttpRoute route,
boolean create) {
RouteSpecificPool rospl = null;
poolLock.lock();
try {
rospl = routeToPool.get(route);
if ((rospl == null) && create) {
// no pool for this route yet (or anymore)
rospl = newRouteSpecificPool(route);
routeToPool.put(route, rospl);
}
} finally {
poolLock.unlock();
}
return rospl;
}
//@@@ consider alternatives for gathering statistics
public int getConnectionsInPool(HttpRoute route) {
poolLock.lock();
try {
// don't allow a pool to be created here!
RouteSpecificPool rospl = getRoutePool(route, false);
return (rospl != null) ? rospl.getEntryCount() : 0;
} finally {
poolLock.unlock();
}
}
@Override
public PoolEntryRequest requestPoolEntry(
final HttpRoute route,
final Object state) {
final WaitingThreadAborter aborter = new WaitingThreadAborter();
return new PoolEntryRequest() {
public void abortRequest() {
poolLock.lock();
try {
aborter.abort();
} finally {
poolLock.unlock();
}
}
public BasicPoolEntry getPoolEntry(
long timeout,
TimeUnit tunit)
throws InterruptedException, ConnectionPoolTimeoutException {
return getEntryBlocking(route, state, timeout, tunit, aborter);
}
};
}
/**
* Obtains a pool entry with a connection within the given timeout.
* If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
* must be called before blocking, to allow the thread to be interrupted.
*
* @param route the route for which to get the connection
* @param timeout the timeout, 0 or negative for no timeout
* @param tunit the unit for the <code>timeout,
* may be <code>null only if there is no timeout
* @param aborter an object which can abort a {@link WaitingThread}.
*
* @return pool entry holding a connection for the route
*
* @throws ConnectionPoolTimeoutException
* if the timeout expired
* @throws InterruptedException
* if the calling thread was interrupted
*/
protected BasicPoolEntry getEntryBlocking(
HttpRoute route, Object state,
long timeout, TimeUnit tunit,
WaitingThreadAborter aborter)
throws ConnectionPoolTimeoutException, InterruptedException {
int maxTotalConnections = ConnManagerParams.getMaxTotalConnections(params);
Date deadline = null;
if (timeout > 0) {
deadline = new Date
(System.currentTimeMillis() + tunit.toMillis(timeout));
}
BasicPoolEntry entry = null;
poolLock.lock();
try {
RouteSpecificPool rospl = getRoutePool(route, true);
WaitingThread waitingThread = null;
while (entry == null) {
if (isShutDown) {
throw new IllegalStateException
("Connection pool shut down.");
}
if (log.isDebugEnabled()) {
log.debug("[" + route + "] kept alive: " + freeConnections.size() +
", issued: " + leasedConnections.size() +
", allocated: " + numConnections + " out of " + maxTotalConnections);
}
// the cases to check for:
// - have a free connection for that route
// - allowed to create a free connection for that route
// - can delete and replace a free connection for another route
// - need to wait for one of the things above to come true
entry = getFreeEntry(rospl, state);
if (entry != null) {
break;
}
boolean hasCapacity = rospl.getCapacity() > 0;
if (log.isDebugEnabled()) {
log.debug("Available capacity: " + rospl.getCapacity()
+ " out of " + rospl.getMaxEntries()
+ " [" + route + "][" + state + "]");
}
if (hasCapacity && numConnections < maxTotalConnections) {
entry = createEntry(rospl, operator);
} else if (hasCapacity && !freeConnections.isEmpty()) {
deleteLeastUsedEntry();
// if least used entry's route was the same as rospl,
// rospl is now out of date : we preemptively refresh
rospl = getRoutePool(route, true);
entry = createEntry(rospl, operator);
} else {
if (log.isDebugEnabled()) {
log.debug("Need to wait for connection" +
" [" + route + "][" + state + "]");
}
if (waitingThread == null) {
waitingThread =
newWaitingThread(poolLock.newCondition(), rospl);
aborter.setWaitingThread(waitingThread);
}
boolean success = false;
try {
rospl.queueThread(waitingThread);
waitingThreads.add(waitingThread);
success = waitingThread.await(deadline);
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
rospl.removeThread(waitingThread);
waitingThreads.remove(waitingThread);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null) &&
(deadline.getTime() <= System.currentTimeMillis())) {
throw new ConnectionPoolTimeoutException
("Timeout waiting for connection");
}
}
} // while no entry
} finally {
poolLock.unlock();
}
return entry;
} // getEntry
// non-javadoc, see base class AbstractConnPool
@Override
public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
HttpRoute route = entry.getPlannedRoute();
if (log.isDebugEnabled()) {
log.debug("Releasing connection" +
" [" + route + "][" + entry.getState() + "]");
}
poolLock.lock();
try {
if (isShutDown) {
// the pool is shut down, release the
// connection's resources and get out of here
closeConnection(entry.getConnection());
return;
}
// no longer issued, we keep a hard reference now
leasedConnections.remove(entry);
RouteSpecificPool rospl = getRoutePool(route, true);
if (reusable) {
if (log.isDebugEnabled()) {
log.debug("Pooling connection" +
" [" + route + "][" + entry.getState() + "]" +
"; keep alive for " + validDuration + " " + timeUnit.toString());
}
rospl.freeEntry(entry);
freeConnections.add(entry);
idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
} else {
rospl.dropEntry();
numConnections--;
}
notifyWaitingThread(rospl);
} finally {
poolLock.unlock();
}
} // freeEntry
/**
* If available, get a free pool entry for a route.
*
* @param rospl the route-specific pool from which to get an entry
*
* @return an available pool entry for the given route, or
* <code>null if none is available
*/
protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
BasicPoolEntry entry = null;
poolLock.lock();
try {
boolean done = false;
while(!done) {
entry = rospl.allocEntry(state);
if (entry != null) {
if (log.isDebugEnabled()) {
log.debug("Getting free connection"
+ " [" + rospl.getRoute() + "][" + state + "]");
}
freeConnections.remove(entry);
boolean valid = idleConnHandler.remove(entry.getConnection());
if(!valid) {
// If the free entry isn't valid anymore, get rid of it
// and loop to find another one that might be valid.
if(log.isDebugEnabled())
log.debug("Closing expired free connection"
+ " [" + rospl.getRoute() + "][" + state + "]");
closeConnection(entry.getConnection());
// We use dropEntry instead of deleteEntry because the entry
// is no longer "free" (we just allocated it), and deleteEntry
// can only be used to delete free entries.
rospl.dropEntry();
numConnections--;
} else {
leasedConnections.add(entry);
done = true;
}
} else {
done = true;
if (log.isDebugEnabled()) {
log.debug("No free connections"
+ " [" + rospl.getRoute() + "][" + state + "]");
}
}
}
} finally {
poolLock.unlock();
}
return entry;
}
/**
* Creates a new pool entry.
* This method assumes that the new connection will be handed
* out immediately.
*
* @param rospl the route-specific pool for which to create the entry
* @param op the operator for creating a connection
*
* @return the new pool entry for a new connection
*/
protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
ClientConnectionOperator op) {
if (log.isDebugEnabled()) {
log.debug("Creating new connection [" + rospl.getRoute() + "]");
}
// the entry will create the connection when needed
BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute());
poolLock.lock();
try {
rospl.createdEntry(entry);
numConnections++;
leasedConnections.add(entry);
} finally {
poolLock.unlock();
}
return entry;
}
/**
* Deletes a given pool entry.
* This closes the pooled connection and removes all references,
* so that it can be GCed.
*
* <p>Note: Does not remove the entry from the freeConnections list.
* It is assumed that the caller has already handled this step.</p>
* <!-- @@@ is that a good idea? or rather fix it? -->
*
* @param entry the pool entry for the connection to delete
*/
protected void deleteEntry(BasicPoolEntry entry) {
HttpRoute route = entry.getPlannedRoute();
if (log.isDebugEnabled()) {
log.debug("Deleting connection"
+ " [" + route + "][" + entry.getState() + "]");
}
poolLock.lock();
try {
closeConnection(entry.getConnection());
RouteSpecificPool rospl = getRoutePool(route, true);
rospl.deleteEntry(entry);
numConnections--;
if (rospl.isUnused()) {
routeToPool.remove(route);
}
idleConnHandler.remove(entry.getConnection());// not idle, but dead
} finally {
poolLock.unlock();
}
}
/**
* Delete an old, free pool entry to make room for a new one.
* Used to replace pool entries with ones for a different route.
*/
protected void deleteLeastUsedEntry() {
try {
poolLock.lock();
//@@@ with get() instead of remove, we could
//@@@ leave the removing to deleteEntry()
BasicPoolEntry entry = freeConnections.remove();
if (entry != null) {
deleteEntry(entry);
} else if (log.isDebugEnabled()) {
log.debug("No free connection to delete.");
}
} finally {
poolLock.unlock();
}
}
// non-javadoc, see base class AbstractConnPool
@Override
protected void handleLostEntry(HttpRoute route) {
poolLock.lock();
try {
RouteSpecificPool rospl = getRoutePool(route, true);
rospl.dropEntry();
if (rospl.isUnused()) {
routeToPool.remove(route);
}
numConnections--;
notifyWaitingThread(rospl);
} finally {
poolLock.unlock();
}
}
/**
* Notifies a waiting thread that a connection is available.
* This will wake a thread waiting in the specific route pool,
* if there is one.
* Otherwise, a thread in the connection pool will be notified.
*
* @param rospl the pool in which to notify, or <code>null
*/
protected void notifyWaitingThread(RouteSpecificPool rospl) {
//@@@ while this strategy provides for best connection re-use,
//@@@ is it fair? only do this if the connection is open?
// Find the thread we are going to notify. We want to ensure that
// each waiting thread is only interrupted once, so we will remove
// it from all wait queues before interrupting.
WaitingThread waitingThread = null;
poolLock.lock();
try {
if ((rospl != null) && rospl.hasThread()) {
if (log.isDebugEnabled()) {
log.debug("Notifying thread waiting on pool" +
" [" + rospl.getRoute() + "]");
}
waitingThread = rospl.nextThread();
} else if (!waitingThreads.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Notifying thread waiting on any pool");
}
waitingThread = waitingThreads.remove();
} else if (log.isDebugEnabled()) {
log.debug("Notifying no-one, there are no waiting threads");
}
if (waitingThread != null) {
waitingThread.wakeup();
}
} finally {
poolLock.unlock();
}
}
@Override
public void deleteClosedConnections() {
poolLock.lock();
try {
Iterator<BasicPoolEntry> iter = freeConnections.iterator();
while (iter.hasNext()) {
BasicPoolEntry entry = iter.next();
if (!entry.getConnection().isOpen()) {
iter.remove();
deleteEntry(entry);
}
}
} finally {
poolLock.unlock();
}
}
// non-javadoc, see base class AbstractConnPool
@Override
public void shutdown() {
poolLock.lock();
try {
super.shutdown();
// close all free connections
//@@@ move this to base class?
Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
while (ibpe.hasNext()) {
BasicPoolEntry entry = ibpe.next();
ibpe.remove();
if (log.isDebugEnabled()) {
log.debug("Closing connection"
+ " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]");
}
closeConnection(entry.getConnection());
}
// wake up all waiting threads
Iterator<WaitingThread> iwth = waitingThreads.iterator();
while (iwth.hasNext()) {
WaitingThread waiter = iwth.next();
iwth.remove();
waiter.wakeup();
}
routeToPool.clear();
} finally {
poolLock.unlock();
}
}
} // class ConnPoolByRoute
Other Java examples (source code examples)
Here is a short list of links related to this Java ConnPoolByRoute.java source code file:
|