alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (ActiveMQResourceAdapter.java)

This example ActiveMQ source code file (ActiveMQResourceAdapter.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqconnection, activemqendpointactivationkey, activemqendpointactivationkey, activemqendpointworker, activemqendpointworker, jmsexception, messageactivationspec, messageactivationspec, messageresourceadapter, net, network, override, resourceexception, string, string, util, xaresource

The ActiveMQ ActiveMQResourceAdapter.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.ra;

import java.net.URI;
import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.ServiceSupport;

/**
 * Knows how to connect to one ActiveMQ server. It can then activate endpoints
 * and deliver messages to those end points using the connection configure in
 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
 * 
 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
 *                         description="The JCA Resource Adaptor for ActiveMQ"
 * 
 */
public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {

    private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap();

    private BootstrapContext bootstrapContext;
    private String brokerXmlConfig;
    private BrokerService broker;
    private Thread brokerStartThread;
    private ActiveMQConnectionFactory connectionFactory;
    
    /**
     * 
     */
    public ActiveMQResourceAdapter() {
        super();
    }

    /**
     * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
     */
    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
        this.bootstrapContext = bootstrapContext;
        if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
            brokerStartThread = new Thread("Starting ActiveMQ Broker") {
                @Override
                public void run () {
                    try {
                        // ensure RAR resources are available to xbean (needed for weblogic)
                        log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
                        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
                        log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
                        
                        synchronized( ActiveMQResourceAdapter.this ) {
                            broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
                        }
                        broker.start();
                    } catch (Throwable e) {
                        log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
                        log.debug("Reason for: "+e.getMessage(), e);
                    }
                }
            };
            brokerStartThread.setDaemon(true);
            brokerStartThread.start();
            
            // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
            try {
                brokerStartThread.join(1000*5);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }                
        }
    }

    /**
     * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
     */
    public ActiveMQConnection makeConnection() throws JMSException {
        if( connectionFactory == null ) {
            return makeConnection(getInfo());
        } else {
            return makeConnection(getInfo(), connectionFactory);
        }
    }

    /**
     * @param activationSpec
     */
    public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
        ActiveMQConnectionFactory cf = getConnectionFactory();
        if (cf == null) {
            cf = createConnectionFactory(getInfo());
        }
        String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
        String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
        String clientId = activationSpec.getClientId();
        if (clientId != null) {
            cf.setClientID(clientId);
        } else {
            if (activationSpec.isDurableSubscription()) {
                log.warn("No clientID specified for durable subscription: " + activationSpec);
            }
        }
        ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);

        // have we configured a redelivery policy
        RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
        if (redeliveryPolicy != null) {
            physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
        }
        return physicalConnection;
    }

    /**
     * @see javax.resource.spi.ResourceAdapter#stop()
     */
    public void stop() {
        while (endpointWorkers.size() > 0) {
            ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
            endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
        }
        
        synchronized( this ) {
            if (broker != null) {
                if( brokerStartThread.isAlive() ) {
                    brokerStartThread.interrupt();
                }
                ServiceSupport.dispose(broker);
                broker = null;
            }
        }
        
        this.bootstrapContext = null;
    }

    /**
     * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
     */
    public BootstrapContext getBootstrapContext() {
        return bootstrapContext;
    }

    /**
     * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
     *      javax.resource.spi.ActivationSpec)
     */
    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {

        // spec section 5.3.3
        if (!equals(activationSpec.getResourceAdapter())) {
            throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
        }

        if (!(activationSpec instanceof MessageActivationSpec)) {
            throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
        }

        ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
        // This is weird.. the same endpoint activated twice.. must be a
        // container error.
        if (endpointWorkers.containsKey(key)) {
            throw new IllegalStateException("Endpoint previously activated");
        }

        ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);

        endpointWorkers.put(key, worker);
        worker.start();
    }

    /**
     * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
     *      javax.resource.spi.ActivationSpec)
     */
    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {

        if (activationSpec instanceof MessageActivationSpec) {
            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
            ActiveMQEndpointWorker worker = endpointWorkers.remove(key);
            if (worker == null) {
                // This is weird.. that endpoint was not activated.. oh well..
                // this method
                // does not throw exceptions so just return.
                return;
            }
            try {
                worker.stop();
            } catch (InterruptedException e) {
                // We interrupted.. we won't throw an exception but will stop
                // waiting for the worker
                // to stop.. we tried our best. Keep trying to interrupt the
                // thread.
                Thread.currentThread().interrupt();
            }

        }

    }

    /**
     * We only connect to one resource manager per ResourceAdapter instance, so
     * any ActivationSpec will return the same XAResource.
     * 
     * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
     */
    public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
        Connection connection = null;
        try {
            connection = makeConnection();
            if (connection instanceof XAConnection) {
                XASession session = ((XAConnection)connection).createXASession();
                XAResource xaResource = session.getXAResource();
                return new XAResource[] {
                    xaResource
                };
            }
            return new XAResource[] {};
        } catch (JMSException e) {
            throw new ResourceException(e);
        } finally {
            try {
                connection.close();
            } catch (Throwable ignore) {
                //
            }
        }
    }

    // ///////////////////////////////////////////////////////////////////////
    //
    // Java Bean getters and setters for this ResourceAdapter class.
    //
    // ///////////////////////////////////////////////////////////////////////

    /**
     * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
     */
    public String getBrokerXmlConfig() {
        return brokerXmlConfig;
    }

    /**
     * Sets the <a href="http://activemq.org/Xml+Configuration">XML
     * configuration file </a> used to configure the ActiveMQ broker via Spring
     * if using embedded mode.
     * 
     * @param brokerXmlConfig is the filename which is assumed to be on the
     *                classpath unless a URL is specified. So a value of
     *                <code>foo/bar.xml would be assumed to be on the
     *                classpath whereas <code>file:dir/file.xml would
     *                use the file system. Any valid URL string is supported.
     */
    public void setBrokerXmlConfig(String brokerXmlConfig) {
        this.brokerXmlConfig = brokerXmlConfig;
    }

    /**
     * @see java.lang.Object#equals(java.lang.Object)
     */
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof MessageResourceAdapter)) {
            return false;
        }

        final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;

        if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
            return false;
        }
        if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
            return false;
        }

        return true;
    }

    /**
     * @see java.lang.Object#hashCode()
     */
    @Override
    public int hashCode() {
        int result;
        result = getInfo().hashCode();
        if (brokerXmlConfig != null) {
            result ^= brokerXmlConfig.hashCode();
        }
        return result;
    }

    public ActiveMQConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
        this.connectionFactory = aConnectionFactory;
    }


    }

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ ActiveMQResourceAdapter.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.