alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (TransportConnection.java)

This example ActiveMQ source code file (TransportConnection.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

cannot, exception, exception, failed, illegalstateexception, ioexception, management, net, network, response, response, string, string, threading, threads, throwable, transport, transportconnectionstate, transportconnectionstate, util

The ActiveMQ TransportConnection.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.management.ObjectName;
import javax.transaction.xa.XAResource;

import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.*;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class TransportConnection implements Connection, Task, CommandVisitor {
    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
    // Keeps track of the broker and connector that created this connection.
    protected final Broker broker;
    protected final TransportConnector connector;
    // Keeps track of the state of the connections.
    // protected final ConcurrentHashMap localConnectionStates=new
    // ConcurrentHashMap();
    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
    // The broker and wireformat info that was exchanged.
    protected BrokerInfo brokerInfo;
    protected final List<Command> dispatchQueue = new LinkedList();
    protected TaskRunner taskRunner;
    protected final AtomicReference<IOException> transportException = new AtomicReference();
    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
    private MasterBroker masterBroker;
    private final Transport transport;
    private MessageAuthorizationPolicy messageAuthorizationPolicy;
    private WireFormatInfo wireFormatInfo;
    // Used to do async dispatch.. this should perhaps be pushed down into the
    // transport layer..
    private boolean inServiceException;
    private final ConnectionStatistics statistics = new ConnectionStatistics();
    private boolean manageable;
    private boolean slow;
    private boolean markedCandidate;
    private boolean blockedCandidate;
    private boolean blocked;
    private boolean connected;
    private boolean active;
    private boolean starting;
    private boolean pendingStop;
    private long timeStamp;
    private final AtomicBoolean stopping = new AtomicBoolean(false);
    private final CountDownLatch stopped = new CountDownLatch(1);
    private final AtomicBoolean asyncException = new AtomicBoolean(false);
    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap();
    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap();
    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
    private ConnectionContext context;
    private boolean networkConnection;
    private boolean faultTolerantConnection;
    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
    private DemandForwardingBridge duplexBridge;
    private final TaskRunnerFactory taskRunnerFactory;
    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
    private String duplexNetworkConnectorId;

    /**
     * @param connector
     * @param transport
     * @param broker
     * @param taskRunnerFactory
     *            - can be null if you want direct dispatch to the transport
     *            else commands are sent async.
     */
    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
            TaskRunnerFactory taskRunnerFactory) {
        this.connector = connector;
        this.broker = broker;
        this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
        brokerConnectionStates = rb.getConnectionStates();
        if (connector != null) {
            this.statistics.setParent(connector.getStatistics());
        }
        this.taskRunnerFactory = taskRunnerFactory;
        this.transport = transport;
        this.transport.setTransportListener(new DefaultTransportListener() {
            @Override
            public void onCommand(Object o) {
                serviceLock.readLock().lock();
                try {
                    if (!(o instanceof Command)) {
                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
                    }
                    Command command = (Command) o;
                    Response response = service(command);
                    if (response != null) {
                        dispatchSync(response);
                    }
                } finally {
                    serviceLock.readLock().unlock();
                }
            }

            @Override
            public void onException(IOException exception) {
                serviceLock.readLock().lock();
                try {
                    serviceTransportException(exception);
                } finally {
                    serviceLock.readLock().unlock();
                }
            }
        });
        connected = true;
    }

    /**
     * Returns the number of messages to be dispatched to this connection
     * 
     * @return size of dispatch queue
     */
    public int getDispatchQueueSize() {
        synchronized (dispatchQueue) {
            return dispatchQueue.size();
        }
    }

    public void serviceTransportException(IOException e) {
        BrokerService bService = connector.getBrokerService();
        if (bService.isShutdownOnSlaveFailure()) {
            if (brokerInfo != null) {
                if (brokerInfo.isSlaveBroker()) {
                    LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
                    try {
                        doStop();
                        bService.stop();
                    } catch (Exception ex) {
                        LOG.warn("Failed to stop the master", ex);
                    }
                }
            }
        }
        if (!stopping.get()) {
            transportException.set(e);
            if (TRANSPORTLOG.isDebugEnabled()) {
                TRANSPORTLOG.debug("Transport failed: " + e, e);
            } else if (TRANSPORTLOG.isInfoEnabled()) {
                TRANSPORTLOG.info("Transport failed: " + e);
            }
            stopAsync();
        }
    }

    /**
     * Calls the serviceException method in an async thread. Since handling a
     * service exception closes a socket, we should not tie up broker threads
     * since client sockets may hang or cause deadlocks.
     * 
     * @param e
     */
    public void serviceExceptionAsync(final IOException e) {
        if (asyncException.compareAndSet(false, true)) {
            new Thread("Async Exception Handler") {
                @Override
                public void run() {
                    serviceException(e);
                }
            }.start();
        }
    }

    /**
     * Closes a clients connection due to a detected error. Errors are ignored
     * if: the client is closing or broker is closing. Otherwise, the connection
     * error transmitted to the client before stopping it's transport.
     */
    public void serviceException(Throwable e) {
        // are we a transport exception such as not being able to dispatch
        // synchronously to a transport
        if (e instanceof IOException) {
            serviceTransportException((IOException) e);
        } else if (e.getClass() == BrokerStoppedException.class) {
            // Handle the case where the broker is stopped
            // But the client is still connected.
            if (!stopping.get()) {
                if (SERVICELOG.isDebugEnabled()) {
                    SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
                }
                ConnectionError ce = new ConnectionError();
                ce.setException(e);
                dispatchSync(ce);
                // Wait a little bit to try to get the output buffer to flush
                // the exption notification to the client.
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
                // Worst case is we just kill the connection before the
                // notification gets to him.
                stopAsync();
            }
        } else if (!stopping.get() && !inServiceException) {
            inServiceException = true;
            try {
                SERVICELOG.warn("Async error occurred: " + e, e);
                ConnectionError ce = new ConnectionError();
                ce.setException(e);
                dispatchAsync(ce);
            } finally {
                inServiceException = false;
            }
        }
    }

    public Response service(Command command) {
        MDC.put("activemq.connector", connector.getUri().toString());
        Response response = null;
        boolean responseRequired = command.isResponseRequired();
        int commandId = command.getCommandId();
        try {
            response = command.visit(this);
        } catch (Throwable e) {
            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
                        + " command: " + command + ", exception: " + e, e);
            }
            if (responseRequired) {
                response = new ExceptionResponse(e);
            } else {
                serviceException(e);
            }
        }
        if (responseRequired) {
            if (response == null) {
                response = new Response();
            }
            response.setCorrelationId(commandId);
        }
        // The context may have been flagged so that the response is not
        // sent.
        if (context != null) {
            if (context.isDontSendReponse()) {
                context.setDontSendReponse(false);
                response = null;
            }
            context = null;
        }
        MDC.remove("activemq.connector");
        return response;
    }

    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
        return null;
    }

    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
        return null;
    }

    public Response processWireFormat(WireFormatInfo info) throws Exception {
        wireFormatInfo = info;
        protocolVersion.set(info.getVersion());
        return null;
    }

    public Response processShutdown(ShutdownInfo info) throws Exception {
        stopAsync();
        return null;
    }

    public Response processFlush(FlushCommand command) throws Exception {
        return null;
    }

    public Response processBeginTransaction(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        if (cs == null) {
            throw new NullPointerException("Context is null");
        }
        // Avoid replaying dup commands
        if (cs.getTransactionState(info.getTransactionId()) == null) {
            cs.addTransactionState(info.getTransactionId());
            broker.beginTransaction(context, info.getTransactionId());
        }
        return null;
    }

    public Response processEndTransaction(TransactionInfo info) throws Exception {
        // No need to do anything. This packet is just sent by the client
        // make sure he is synced with the server as commit command could
        // come from a different connection.
        return null;
    }

    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        if (cs == null) {
            throw new NullPointerException("Context is null");
        }
        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
        if (transactionState == null) {
            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
                    + info.getTransactionId());
        }
        // Avoid dups.
        if (!transactionState.isPrepared()) {
            transactionState.setPrepared(true);
            int result = broker.prepareTransaction(context, info.getTransactionId());
            transactionState.setPreparedResult(result);
            if (result == XAResource.XA_RDONLY) {
                // we are done, no further rollback or commit from TM
                cs.removeTransactionState(info.getTransactionId());
            }
            IntegerResponse response = new IntegerResponse(result);
            return response;
        } else {
            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
            return response;
        }
    }

    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = cs.getContext();
        cs.removeTransactionState(info.getTransactionId());
        broker.commitTransaction(context, info.getTransactionId(), true);
        return null;
    }

    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = cs.getContext();
        cs.removeTransactionState(info.getTransactionId());
        broker.commitTransaction(context, info.getTransactionId(), false);
        return null;
    }

    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = cs.getContext();
        cs.removeTransactionState(info.getTransactionId());
        broker.rollbackTransaction(context, info.getTransactionId());
        return null;
    }

    public Response processForgetTransaction(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = cs.getContext();
        broker.forgetTransaction(context, info.getTransactionId());
        return null;
    }

    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        context = cs.getContext();
        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
        return new DataArrayResponse(preparedTransactions);
    }

    public Response processMessage(Message messageSend) throws Exception {
        ProducerId producerId = messageSend.getProducerId();
        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
        if (producerExchange.canDispatch(messageSend)) {
            broker.send(producerExchange, messageSend);
        }
        return null;
    }

    public Response processMessageAck(MessageAck ack) throws Exception {
        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
        broker.acknowledge(consumerExchange, ack);
        return null;
    }

    public Response processMessagePull(MessagePull pull) throws Exception {
        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
    }

    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
        broker.processDispatchNotification(notification);
        return null;
    }

    public Response processAddDestination(DestinationInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        broker.addDestinationInfo(cs.getContext(), info);
        if (info.getDestination().isTemporary()) {
            cs.addTempDestination(info);
        }
        return null;
    }

    public Response processRemoveDestination(DestinationInfo info) throws Exception {
        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
        broker.removeDestinationInfo(cs.getContext(), info);
        if (info.getDestination().isTemporary()) {
            cs.removeTempDestination(info.getDestination());
        }
        return null;
    }

    public Response processAddProducer(ProducerInfo info) throws Exception {
        SessionId sessionId = info.getProducerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
                    + sessionId);
        }
        // Avoid replaying dup commands
        if (!ss.getProducerIds().contains(info.getProducerId())) {
            broker.addProducer(cs.getContext(), info);
            try {
                ss.addProducer(info);
            } catch (IllegalStateException e) {
                broker.removeProducer(cs.getContext(), info);
            }
        }
        return null;
    }

    public Response processRemoveProducer(ProducerId id) throws Exception {
        SessionId sessionId = id.getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
                    + sessionId);
        }
        ProducerState ps = ss.removeProducer(id);
        if (ps == null) {
            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
        }
        removeProducerBrokerExchange(id);
        broker.removeProducer(cs.getContext(), ps.getInfo());
        return null;
    }

    public Response processAddConsumer(ConsumerInfo info) throws Exception {
        SessionId sessionId = info.getConsumerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException(broker.getBrokerName()
                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
        }
        // Avoid replaying dup commands
        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
            broker.addConsumer(cs.getContext(), info);
            try {
                ss.addConsumer(info);
            } catch (IllegalStateException e) {
                broker.removeConsumer(cs.getContext(), info);
            }
        }
        return null;
    }

    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
        SessionId sessionId = id.getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        if (cs == null) {
            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
                    + connectionId);
        }
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
                    + sessionId);
        }
        ConsumerState consumerState = ss.removeConsumer(id);
        if (consumerState == null) {
            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
        }
        ConsumerInfo info = consumerState.getInfo();
        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
        removeConsumerBrokerExchange(id);
        return null;
    }

    public Response processAddSession(SessionInfo info) throws Exception {
        ConnectionId connectionId = info.getSessionId().getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        // Avoid replaying dup commands
        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
            broker.addSession(cs.getContext(), info);
            try {
                cs.addSession(info);
            } catch (IllegalStateException e) {
                e.printStackTrace();
                broker.removeSession(cs.getContext(), info);
            }
        }
        return null;
    }

    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
        ConnectionId connectionId = id.getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        if (cs == null) {
            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
        }
        SessionState session = cs.getSessionState(id);
        if (session == null) {
            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
        }
        // Don't let new consumers or producers get added while we are closing
        // this down.
        session.shutdown();
        // Cascade the connection stop to the consumers and producers.
        for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
            ConsumerId consumerId = (ConsumerId) iter.next();
            try {
                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
            } catch (Throwable e) {
                LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
            }
        }
        for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
            ProducerId producerId = (ProducerId) iter.next();
            try {
                processRemoveProducer(producerId);
            } catch (Throwable e) {
                LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
            }
        }
        cs.removeSession(id);
        broker.removeSession(cs.getContext(), session.getInfo());
        return null;
    }

    public Response processAddConnection(ConnectionInfo info) throws Exception {
        // if the broker service has slave attached, wait for the slave to be
        // attached to allow client connection. slave connection is fine
        if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
                && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
            ServiceSupport.dispose(transport);
            return new ExceptionResponse(new Exception("Master's slave not attached yet."));
        }
        // Older clients should have been defaulting this field to true.. but
        // they were not.
        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
            info.setClientMaster(true);
        }
        TransportConnectionState state;
        // Make sure 2 concurrent connections by the same ID only generate 1
        // TransportConnectionState object.
        synchronized (brokerConnectionStates) {
            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
            if (state == null) {
                state = new TransportConnectionState(info, this);
                brokerConnectionStates.put(info.getConnectionId(), state);
            }
            state.incrementReference();
        }
        // If there are 2 concurrent connections for the same connection id,
        // then last one in wins, we need to sync here
        // to figure out the winner.
        synchronized (state.getConnectionMutex()) {
            if (state.getConnection() != this) {
                LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
                state.getConnection().stop();
                LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
                        + state.getConnection().getRemoteAddress());
                state.setConnection(this);
                state.reset(info);
            }
        }
        registerConnectionState(info.getConnectionId(), state);
        LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
        this.faultTolerantConnection=info.isFaultTolerant();
        // Setup the context.
        String clientId = info.getClientId();
        context = new ConnectionContext();
        context.setBroker(broker);
        context.setClientId(clientId);
        context.setClientMaster(info.isClientMaster());
        context.setConnection(this);
        context.setConnectionId(info.getConnectionId());
        context.setConnector(connector);
        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
        context.setNetworkConnection(networkConnection);
        context.setFaultTolerant(faultTolerantConnection);
        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
        context.setUserName(info.getUserName());
        context.setWireFormatInfo(wireFormatInfo);
        context.setReconnect(info.isFailoverReconnect());
        this.manageable = info.isManageable();
        state.setContext(context);
        state.setConnection(this);
       
        try {
            broker.addConnection(context, info);
        } catch (Exception e) {
            synchronized (brokerConnectionStates) {
                brokerConnectionStates.remove(info.getConnectionId());
            }
            unregisterConnectionState(info.getConnectionId());
            LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " +  e.toString());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Exception detail:", e);
            }
            throw e;
        }
        if (info.isManageable()) {
            // send ConnectionCommand
            ConnectionControl command = this.connector.getConnectionControl();
            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
            dispatchAsync(command);
        }
        return null;
    }

    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
            throws InterruptedException {
        LOG.debug("remove connection id: " + id);
        TransportConnectionState cs = lookupConnectionState(id);
        if (cs != null) {
            // Don't allow things to be added to the connection state while we
            // are
            // shutting down.
            cs.shutdown();
            // Cascade the connection stop to the sessions.
            for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
                SessionId sessionId = (SessionId) iter.next();
                try {
                    processRemoveSession(sessionId, lastDeliveredSequenceId);
                } catch (Throwable e) {
                    SERVICELOG.warn("Failed to remove session " + sessionId, e);
                }
            }
            // Cascade the connection stop to temp destinations.
            for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
                DestinationInfo di = (DestinationInfo) iter.next();
                try {
                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
                } catch (Throwable e) {
                    SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
                }
                iter.remove();
            }
            try {
                broker.removeConnection(cs.getContext(), cs.getInfo(), null);
            } catch (Throwable e) {
                SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
                if (LOG.isDebugEnabled()) {
                    SERVICELOG.debug("Exception detail:", e);
                }
            }
            TransportConnectionState state = unregisterConnectionState(id);
            if (state != null) {
                synchronized (brokerConnectionStates) {
                    // If we are the last reference, we should remove the state
                    // from the broker.
                    if (state.decrementReference() == 0) {
                        brokerConnectionStates.remove(id);
                    }
                }
            }
        }
        return null;
    }

    public Response processProducerAck(ProducerAck ack) throws Exception {
        // A broker should not get ProducerAck messages.
        return null;
    }

    public Connector getConnector() {
        return connector;
    }

    public void dispatchSync(Command message) {
        // getStatistics().getEnqueues().increment();
        try {
            processDispatch(message);
        } catch (IOException e) {
            serviceExceptionAsync(e);
        }
    }

    public void dispatchAsync(Command message) {
        if (!stopping.get()) {
            // getStatistics().getEnqueues().increment();
            if (taskRunner == null) {
                dispatchSync(message);
            } else {
                synchronized (dispatchQueue) {
                    dispatchQueue.add(message);
                }
                try {
                    taskRunner.wakeup();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } else {
            if (message.isMessageDispatch()) {
                MessageDispatch md = (MessageDispatch) message;
                Runnable sub = md.getTransmitCallback();
                broker.postProcessDispatch(md);
                if (sub != null) {
                    sub.run();
                }
            }
        }
    }

    protected void processDispatch(Command command) throws IOException {
        final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
        try {
            if (!stopping.get()) {
                if (messageDispatch != null) {
                    broker.preProcessDispatch(messageDispatch);
                }
                dispatch(command);
            }
        } finally {
            if (messageDispatch != null) {
                Runnable sub = messageDispatch.getTransmitCallback();
                broker.postProcessDispatch(messageDispatch);
                if (sub != null) {
                    sub.run();
                }
            }
            // getStatistics().getDequeues().increment();
        }
    }

    public boolean iterate() {
        try {
            if (stopping.get()) {
                if (dispatchStopped.compareAndSet(false, true)) {
                    if (transportException.get() == null) {
                        try {
                            dispatch(new ShutdownInfo());
                        } catch (Throwable ignore) {
                        }
                    }
                    dispatchStoppedLatch.countDown();
                }
                return false;
            }
            if (!dispatchStopped.get()) {
                Command command = null;
                synchronized (dispatchQueue) {
                    if (dispatchQueue.isEmpty()) {
                        return false;
                    }
                    command = dispatchQueue.remove(0);
                }
                processDispatch(command);
                return true;
            }
            return false;
        } catch (IOException e) {
            if (dispatchStopped.compareAndSet(false, true)) {
                dispatchStoppedLatch.countDown();
            }
            serviceExceptionAsync(e);
            return false;
        }
    }

    /**
     * Returns the statistics for this connection
     */
    public ConnectionStatistics getStatistics() {
        return statistics;
    }

    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
        return messageAuthorizationPolicy;
    }

    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
    }

    public boolean isManageable() {
        return manageable;
    }

    public void start() throws Exception {
        starting = true;
        try {
            synchronized (this) {
                if (taskRunnerFactory != null) {
                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
                            + getRemoteAddress());
                } else {
                    taskRunner = null;
                }
                transport.start();
                active = true;
                BrokerInfo info = connector.getBrokerInfo().copy();
                if (connector.isUpdateClusterClients()) {
                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
                } else {
                    info.setPeerBrokerInfos(null);
                }
                dispatchAsync(info);
                
                connector.onStarted(this);
            }
        } catch (Exception e) {
            // Force clean up on an error starting up.
            stop();
            throw e;
        } finally {
            // stop() can be called from within the above block,
            // but we want to be sure start() completes before
            // stop() runs, so queue the stop until right now:
            starting = false;
            if (pendingStop) {
                LOG.debug("Calling the delayed stop()");
                stop();
            }
        }
    }

    public void stop() throws Exception {
        synchronized (this) {
            pendingStop = true;
            if (starting) {
                LOG.debug("stop() called in the middle of start(). Delaying...");
                return;
            }
        }
        stopAsync();
        while (!stopped.await(5, TimeUnit.SECONDS)) {
            LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
        }
    }

    public void stopAsync() {
        // If we're in the middle of starting
        // then go no further... for now.
        if (stopping.compareAndSet(false, true)) {
            // Let all the connection contexts know we are shutting down
            // so that in progress operations can notice and unblock.
            List<TransportConnectionState> connectionStates = listConnectionStates();
            for (TransportConnectionState cs : connectionStates) {
                cs.getContext().getStopping().set(true);
            }
            try {
                final Map context = MDCHelper.getCopyOfContextMap();
                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
                    public void run() {
                        serviceLock.writeLock().lock();
                        try {
                            MDCHelper.setContextMap(context);
                            doStop();
                        } catch (Throwable e) {
                            LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
                                    + "': ", e);
                        } finally {
                            stopped.countDown();
                            serviceLock.writeLock().unlock();
                        }
                    }
                }, "StopAsync:" + transport.getRemoteAddress());
            } catch (Throwable t) {
                LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
                stopped.countDown();
            }
        }
    }

    @Override
    public String toString() {
        return "Transport Connection to: " + transport.getRemoteAddress();
    }

    protected void doStop() throws Exception, InterruptedException {
        LOG.debug("Stopping connection: " + transport.getRemoteAddress());
        connector.onStopped(this);
        try {
            synchronized (this) {
                if (masterBroker != null) {
                    masterBroker.stop();
                }
                if (duplexBridge != null) {
                    duplexBridge.stop();
                }
            }
        } catch (Exception ignore) {
            LOG.trace("Exception caught stopping", ignore);
        }
        try {
            transport.stop();
            LOG.debug("Stopped transport: " + transport.getRemoteAddress());
        } catch (Exception e) {
            LOG.debug("Could not stop transport: " + e, e);
        }
        if (taskRunner != null) {
            taskRunner.shutdown(1);
        }
        active = false;
        // Run the MessageDispatch callbacks so that message references get
        // cleaned up.
        synchronized (dispatchQueue) {
            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
                Command command = iter.next();
                if (command.isMessageDispatch()) {
                    MessageDispatch md = (MessageDispatch) command;
                    Runnable sub = md.getTransmitCallback();
                    broker.postProcessDispatch(md);
                    if (sub != null) {
                        sub.run();
                    }
                }
            }
            dispatchQueue.clear();
        }
        //
        // Remove all logical connection associated with this connection
        // from the broker.
        if (!broker.isStopped()) {
            List<TransportConnectionState> connectionStates = listConnectionStates();
            connectionStates = listConnectionStates();
            for (TransportConnectionState cs : connectionStates) {
                cs.getContext().getStopping().set(true);
                try {
                    LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
                    processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
                } catch (Throwable ignore) {
                    ignore.printStackTrace();
                }
            }
        }
        LOG.debug("Connection Stopped: " + getRemoteAddress());
    }

    /**
     * @return Returns the blockedCandidate.
     */
    public boolean isBlockedCandidate() {
        return blockedCandidate;
    }

    /**
     * @param blockedCandidate
     *            The blockedCandidate to set.
     */
    public void setBlockedCandidate(boolean blockedCandidate) {
        this.blockedCandidate = blockedCandidate;
    }

    /**
     * @return Returns the markedCandidate.
     */
    public boolean isMarkedCandidate() {
        return markedCandidate;
    }

    /**
     * @param markedCandidate
     *            The markedCandidate to set.
     */
    public void setMarkedCandidate(boolean markedCandidate) {
        this.markedCandidate = markedCandidate;
        if (!markedCandidate) {
            timeStamp = 0;
            blockedCandidate = false;
        }
    }

    /**
     * @param slow
     *            The slow to set.
     */
    public void setSlow(boolean slow) {
        this.slow = slow;
    }

    /**
     * @return true if the Connection is slow
     */
    public boolean isSlow() {
        return slow;
    }

    /**
     * @return true if the Connection is potentially blocked
     */
    public boolean isMarkedBlockedCandidate() {
        return markedCandidate;
    }

    /**
     * Mark the Connection, so we can deem if it's collectable on the next sweep
     */
    public void doMark() {
        if (timeStamp == 0) {
            timeStamp = System.currentTimeMillis();
        }
    }

    /**
     * @return if after being marked, the Connection is still writing
     */
    public boolean isBlocked() {
        return blocked;
    }

    /**
     * @return true if the Connection is connected
     */
    public boolean isConnected() {
        return connected;
    }

    /**
     * @param blocked
     *            The blocked to set.
     */
    public void setBlocked(boolean blocked) {
        this.blocked = blocked;
    }

    /**
     * @param connected
     *            The connected to set.
     */
    public void setConnected(boolean connected) {
        this.connected = connected;
    }

    /**
     * @return true if the Connection is active
     */
    public boolean isActive() {
        return active;
    }

    /**
     * @param active
     *            The active to set.
     */
    public void setActive(boolean active) {
        this.active = active;
    }

    /**
     * @return true if the Connection is starting
     */
    public synchronized boolean isStarting() {
        return starting;
    }

    public synchronized boolean isNetworkConnection() {
        return networkConnection;
    }
    
    public boolean isFaultTolerantConnection() {
       return this.faultTolerantConnection;
    }

    protected synchronized void setStarting(boolean starting) {
        this.starting = starting;
    }

    /**
     * @return true if the Connection needs to stop
     */
    public synchronized boolean isPendingStop() {
        return pendingStop;
    }

    protected synchronized void setPendingStop(boolean pendingStop) {
        this.pendingStop = pendingStop;
    }

    public Response processBrokerInfo(BrokerInfo info) {
        if (info.isSlaveBroker()) {
            BrokerService bService = connector.getBrokerService();
            // Do we only support passive slaves - or does the slave want to be
            // passive ?
            boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
            if (passive == false) {
                
                // stream messages from this broker (the master) to
                // the slave
                MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
                masterBroker = new MasterBroker(parent, transport);
                masterBroker.startProcessing();
            }
            LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
            bService.slaveConnectionEstablished();
        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
            // so this TransportConnection is the rear end of a network bridge
            // We have been requested to create a two way pipe ...
            try {
                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
                Map<String, String> props = createMap(properties);
                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
                IntrospectionSupport.setProperties(config, props, "");
                config.setBrokerName(broker.getBrokerName());

                // check for existing duplex connection hanging about

                // We first look if existing network connection already exists for the same broker Id and network connector name
                // It's possible in case of brief network fault to have this transport connector side of the connection always active
                // and the duplex network connector side wanting to open a new one
                // In this case, the old connection must be broken
                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 
                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
                synchronized (connections) {
                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
                        TransportConnection c = iter.next();
                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
                            LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
                            c.stopAsync();
                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
                            c.getStopped().await(1, TimeUnit.SECONDS);
                        }
                    }
                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
                }
                URI uri = broker.getVmConnectorURI();
                HashMap<String, String> map = new HashMap(URISupport.parseParameters(uri));
                map.put("network", "true");
                map.put("async", "false");
                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
                Transport localTransport = TransportFactory.connect(uri);
                Transport remoteBridgeTransport = new ResponseCorrelator(transport);
                String duplexName = localTransport.toString();
                if (duplexName.contains("#")) {
                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
                }
                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
                listener.setCreatedByDuplex(true);
                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
                duplexBridge.setBrokerService(broker.getBrokerService());
                // now turn duplex off this side
                info.setDuplexConnection(false);
                duplexBridge.setCreatedByDuplex(true);
                duplexBridge.duplexStart(this, brokerInfo, info);
                LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
                return null;
            } catch (TransportDisposedIOException e) {
                LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
                return null;
            } catch (Exception e) {
                LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
                return null;
            }
        }
        // We only expect to get one broker info command per connection
        if (this.brokerInfo != null) {
            LOG.warn("Unexpected extra broker info command received: " + info);
        }
        this.brokerInfo = info;
        networkConnection = true;
        List<TransportConnectionState> connectionStates = listConnectionStates();
        for (TransportConnectionState cs : connectionStates) {
            cs.getContext().setNetworkConnection(true);
        }
        return null;
    }

    @SuppressWarnings("unchecked")
    private HashMap<String, String> createMap(Properties properties) {
        return new HashMap(properties);
    }

    protected void dispatch(Command command) throws IOException {
        try {
            setMarkedCandidate(true);
            transport.oneway(command);
        } finally {
            setMarkedCandidate(false);
        }
    }

    public String getRemoteAddress() {
        return transport.getRemoteAddress();
    }

    public String getConnectionId() {
        List<TransportConnectionState> connectionStates = listConnectionStates();
        for (TransportConnectionState cs : connectionStates) {
            if (cs.getInfo().getClientId() != null) {
                return cs.getInfo().getClientId();
            }
            return cs.getInfo().getConnectionId().toString();
        }
        return null;
    }
        
    public void updateClient(ConnectionControl control) {
        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
                && this.wireFormatInfo.getVersion() >= 6) {
            dispatchAsync(control);
        }
    }

    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
        ProducerBrokerExchange result = producerExchanges.get(id);
        if (result == null) {
            synchronized (producerExchanges) {
                result = new ProducerBrokerExchange();
                TransportConnectionState state = lookupConnectionState(id);              
                context = state.getContext();
                if (context.isReconnect()) {
                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
                }
                result.setConnectionContext(context);
                SessionState ss = state.getSessionState(id.getParentId());
                if (ss != null) {
                    result.setProducerState(ss.getProducerState(id));
                    ProducerState producerState = ss.getProducerState(id);
                    if (producerState != null && producerState.getInfo() != null) {
                        ProducerInfo info = producerState.getInfo();
                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
                    }
                }
                producerExchanges.put(id, result);
            }
        } else {
            context = result.getConnectionContext();
        }
        return result;
    }

    private void removeProducerBrokerExchange(ProducerId id) {
        synchronized (producerExchanges) {
            producerExchanges.remove(id);
        }
    }

    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
        ConsumerBrokerExchange result = consumerExchanges.get(id);
        if (result == null) {
            synchronized (consumerExchanges) {
                result = new ConsumerBrokerExchange();
                TransportConnectionState state = lookupConnectionState(id);
                context = state.getContext();
                result.setConnectionContext(context);
                SessionState ss = state.getSessionState(id.getParentId());
                if (ss != null) {
                    ConsumerState cs = ss.getConsumerState(id);
                    if (cs != null) {
                        ConsumerInfo info = cs.getInfo();
                        if (info != null) {
                            if (info.getDestination() != null && info.getDestination().isPattern()) {
                                result.setWildcard(true);
                            }
                        }
                    }
                }
                consumerExchanges.put(id, result);
            }
        }
        return result;
    }

    private void removeConsumerBrokerExchange(ConsumerId id) {
        synchronized (consumerExchanges) {
            consumerExchanges.remove(id);
        }
    }

    public int getProtocolVersion() {
        return protocolVersion.get();
    }

    public Response processControlCommand(ControlCommand command) throws Exception {
        String control = command.getCommand();
        if (control != null && control.equals("shutdown")) {
            System.exit(0);
        }
        return null;
    }

    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
        return null;
    }

    public Response processConnectionControl(ConnectionControl control) throws Exception {
        if (control != null) {
            faultTolerantConnection = control.isFaultTolerant();
        }
        return null;
    }

    public Response processConnectionError(ConnectionError error) throws Exception {
        return null;
    }

    public Response processConsumerControl(ConsumerControl control) throws Exception {
        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
        broker.processConsumerControl(consumerExchange, control);
        return null;
    }

    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
            TransportConnectionState state) {
        TransportConnectionState cs = null;
        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
            // swap implementations
            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
            newRegister.intialize(connectionStateRegister);
            connectionStateRegister = newRegister;
        }
        cs = connectionStateRegister.registerConnectionState(connectionId, state);
        return cs;
    }

    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
        return connectionStateRegister.unregisterConnectionState(connectionId);
    }

    protected synchronized List<TransportConnectionState> listConnectionStates() {
        return connectionStateRegister.listConnectionStates();
    }

    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
        return connectionStateRegister.lookupConnectionState(connectionId);
    }

    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
        return connectionStateRegister.lookupConnectionState(id);
    }

    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
        return connectionStateRegister.lookupConnectionState(id);
    }

    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
        return connectionStateRegister.lookupConnectionState(id);
    }

    protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
        return connectionStateRegister.lookupConnectionState(connectionId);
    }

    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
    }

    protected synchronized String getDuplexNetworkConnectorId() {
        return this.duplexNetworkConnectorId;
    }
    
    protected CountDownLatch getStopped() {
        return stopped;
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ TransportConnection.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.