alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (ActiveMQManagedConnection.java)

This example ActiveMQ source code file (ActiveMQManagedConnection.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqconnectionrequestinfo, connectionevent, error, io, jmsexception, jmsexception, managedconnectionproxy, managedconnectionproxy, not, not, printwriter, resourceexception, resourceexception, string, threading, threads, transactioncontext, util

The ActiveMQ ActiveMQManagedConnection.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.ra;

import java.io.PrintWriter;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.security.auth.Subject;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.LocalTransactionEventListener;
import org.apache.activemq.TransactionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ActiveMQManagedConnection maps to real physical connection to the server.
 * Since a ManagedConnection has to provide a transaction managment interface to
 * the physical connection, and sessions are the objects implement transaction
 * managment interfaces in the JMS API, this object also maps to a singe
 * physical JMS session. <p/> The side-effect is that JMS connection the
 * application gets will allways create the same session object. This is good if
 * running in an app server since the sessions are elisted in the context
 * transaction. This is bad if used outside of an app server since the user may
 * be trying to create 2 different sessions to coordinate 2 different uow.
 * 
 * 
 */
public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO:
                                                                                            // ,
                                                                                            // DissociatableManagedConnection
                                                                                            // {

    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQManagedConnection.class);

    private PrintWriter logWriter;

    private final ActiveMQConnection physicalConnection;
    private final TransactionContext transactionContext;
    private final List<ManagedConnectionProxy> proxyConnections = new CopyOnWriteArrayList();
    private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList();
    private final LocalAndXATransaction localAndXATransaction;

    private Subject subject;
    private ActiveMQConnectionRequestInfo info;
    private boolean destroyed;

    public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
        try {
            this.subject = subject;
            this.info = info;
            this.physicalConnection = physicalConnection;
            this.transactionContext = new TransactionContext(physicalConnection);

            this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
                public void setInManagedTx(boolean inManagedTx) throws JMSException {
                    super.setInManagedTx(inManagedTx);
                    for (ManagedConnectionProxy proxy:proxyConnections) {
                        proxy.setUseSharedTxContext(inManagedTx);
                    }
                }
            };

            this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() {
                public void beginEvent() {
                    fireBeginEvent();
                }

                public void commitEvent() {
                    fireCommitEvent();
                }

                public void rollbackEvent() {
                    fireRollbackEvent();
                }
            });

            physicalConnection.setExceptionListener(this);
        } catch (JMSException e) {
            throw new ResourceException("Could not create a new connection: " + e.getMessage(), e);
        }
    }

    public boolean isInManagedTx() {
        return localAndXATransaction.isInManagedTx();
    }

    public static boolean matches(Object x, Object y) {
        if (x == null ^ y == null) {
            return false;
        }
        if (x != null && !x.equals(y)) {
            return false;
        }
        return true;
    }

    public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {

        // Do we need to change the associated userid/password
        if (!matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword())) {
            physicalConnection.changeUserInfo(info.getUserName(), info.getPassword());
        }

        // Do we need to set the clientId?
        if (info.getClientid() != null && info.getClientid().length() > 0) {
            physicalConnection.setClientID(info.getClientid());
        }

        this.subject = subject;
        this.info = info;
    }

    public Connection getPhysicalConnection() {
        return physicalConnection;
    }

    private void fireBeginEvent() {
        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
        for(ConnectionEventListener l:listeners) {
            l.localTransactionStarted(event);
        }
    }

    private void fireCommitEvent() {
        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
        for(ConnectionEventListener l:listeners) {
            l.localTransactionCommitted(event);
        }
    }

    private void fireRollbackEvent() {
        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
        for(ConnectionEventListener l:listeners) {
            l.localTransactionRolledback(event);
        }
    }

    private void fireCloseEvent(ManagedConnectionProxy proxy) {
        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_CLOSED);
        event.setConnectionHandle(proxy);

        for(ConnectionEventListener l:listeners) {
            l.connectionClosed(event);
        }
    }

    private void fireErrorOccurredEvent(Exception error) {
        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
        for(ConnectionEventListener l:listeners) {
            l.connectionErrorOccurred(event);
        }
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
     *      javax.resource.spi.ConnectionRequestInfo)
     */
    public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
        ManagedConnectionProxy proxy = new ManagedConnectionProxy(this);
        proxyConnections.add(proxy);
        return proxy;
    }

    private boolean isDestroyed() {
        return destroyed;
    }

    /**
     * Close down the physical connection to the server.
     * 
     * @see javax.resource.spi.ManagedConnection#destroy()
     */
    public void destroy() throws ResourceException {
        // Have we allready been destroyed??
        if (isDestroyed()) {
            return;
        }

        cleanup();

        try {
            physicalConnection.close();
            destroyed = true;
        } catch (JMSException e) {
            LOG.info("Error occured during close of a JMS connection.", e);
        }
    }

    /**
     * Cleans up all proxy handles attached to this physical connection so that
     * they cannot be used anymore.
     * 
     * @see javax.resource.spi.ManagedConnection#cleanup()
     */
    public void cleanup() throws ResourceException {

        // Have we allready been destroyed??
        if (isDestroyed()) {
            return;
        }

        for (ManagedConnectionProxy proxy:proxyConnections) {
            proxy.cleanup();
        }
        proxyConnections.clear();

        try {
            physicalConnection.cleanup();
        } catch (JMSException e) {
            throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e);
        }
        // defer transaction cleanup till after close so that close is aware of the current tx
        localAndXATransaction.cleanup();

    }

    /**
     * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
     */
    public void associateConnection(Object connection) throws ResourceException {
        if (connection instanceof ManagedConnectionProxy) {
            ManagedConnectionProxy proxy = (ManagedConnectionProxy)connection;
            proxyConnections.add(proxy);
        } else {
            throw new ResourceException("Not supported : associating connection instance of " + connection.getClass().getName());
        }
    }

    /**
     * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
     */
    public void addConnectionEventListener(ConnectionEventListener listener) {
        listeners.add(listener);
    }

    /**
     * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
     */
    public void removeConnectionEventListener(ConnectionEventListener listener) {
        listeners.remove(listener);
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getXAResource()
     */
    public XAResource getXAResource() throws ResourceException {
        return localAndXATransaction;
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
     */
    public LocalTransaction getLocalTransaction() throws ResourceException {
        return localAndXATransaction;
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getMetaData()
     */
    public ManagedConnectionMetaData getMetaData() throws ResourceException {
        return new ManagedConnectionMetaData() {

            public String getEISProductName() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                try {
                    return physicalConnection.getMetaData().getJMSProviderName();
                } catch (JMSException e) {
                    throw new ResourceException("Error accessing provider.", e);
                }
            }

            public String getEISProductVersion() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                try {
                    return physicalConnection.getMetaData().getProviderVersion();
                } catch (JMSException e) {
                    throw new ResourceException("Error accessing provider.", e);
                }
            }

            public int getMaxConnections() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                return Integer.MAX_VALUE;
            }

            public String getUserName() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                try {
                    return physicalConnection.getClientID();
                } catch (JMSException e) {
                    throw new ResourceException("Error accessing provider.", e);
                }
            }
        };
    }

    /**
     * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
     */
    public void setLogWriter(PrintWriter logWriter) throws ResourceException {
        this.logWriter = logWriter;
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getLogWriter()
     */
    public PrintWriter getLogWriter() throws ResourceException {
        return logWriter;
    }

    /**
     * @param subject subject to match
     * @param info cri to match
     * @return whether the subject and cri match sufficiently to allow using this connection under the new circumstances
     */
    public boolean matches(Subject subject, ConnectionRequestInfo info) {
        // Check to see if it is our info class
        if (info == null) {
            return false;
        }
        if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
            return false;
        }

        // Do the subjects match?
        if (subject == null ^ this.subject == null) {
            return false;
        }
        if (subject != null && !subject.equals(this.subject)) {
            return false;
        }

        // Does the info match?
        return info.equals(this.info);
    }

    /**
     * When a proxy is closed this cleans up the proxy and notifys the
     * ConnectionEventListeners that a connection closed.
     * 
     * @param proxy
     */
    public void proxyClosedEvent(ManagedConnectionProxy proxy) {
        proxyConnections.remove(proxy);
        proxy.cleanup();
        fireCloseEvent(proxy);
    }

    public void onException(JMSException e) {
        LOG.warn("Connection failed: " + e);
        LOG.debug("Cause: ", e);

        for (ManagedConnectionProxy proxy:proxyConnections) {
            proxy.onException(e);
        }
        // Let the container know that the error occured.
        fireErrorOccurredEvent(e);
    }

    /**
     * @return Returns the transactionContext.
     */
    public TransactionContext getTransactionContext() {
        return transactionContext;
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ ActiveMQManagedConnection.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.