alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (ActiveMQEndpointWorker.java)

This example ActiveMQ source code file (ActiveMQEndpointWorker.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqendpointactivationkey, closing, connection, endpoint, initial_reconnect_delay, initial_reconnect_delay, jmsexception, jmsexception, max_reconnect_delay, max_reconnect_delay, messageactivationspec, reflection, resourceexception, resourceexception, work

The ActiveMQ ActiveMQEndpointWorker.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.ra;

import java.lang.reflect.Method;

import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *  $Date$
 */
public class ActiveMQEndpointWorker {

    public static final Method ON_MESSAGE_METHOD;
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class);

    private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
    private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds.
    private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal();

    static {
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
                Message.class
            });
        } catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    protected final ActiveMQEndpointActivationKey endpointActivationKey;
    protected final MessageEndpointFactory endpointFactory;
    protected final WorkManager workManager;
    protected final boolean transacted;

    private final ActiveMQDestination dest;
    private final Work connectWork;
    private final AtomicBoolean connecting = new AtomicBoolean(false);    
    private final Object shutdownMutex = new String("shutdownMutex");
    
    private ActiveMQConnection connection;
    private ConnectionConsumer consumer;
    private ServerSessionPoolImpl serverSessionPool;
    private boolean running;

    protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
        this.endpointActivationKey = key;
        this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
        this.workManager = adapter.getBootstrapContext().getWorkManager();
        try {
            this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
        } catch (NoSuchMethodException e) {
            throw new ResourceException("Endpoint does not implement the onMessage method.");
        }

        connectWork = new Work() {
            long currentReconnectDelay = INITIAL_RECONNECT_DELAY;

            public void release() {
                //
            }

            public synchronized void run() {
                currentReconnectDelay = INITIAL_RECONNECT_DELAY;
                MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
                if ( LOG.isInfoEnabled() ) {
                    LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
                }

                while ( connecting.get() && running ) {
                try {
                    connection = adapter.makeConnection(activationSpec);
                    connection.setExceptionListener(new ExceptionListener() {
                        public void onException(JMSException error) {
                            if (!serverSessionPool.isClosing()) {
                                    // initiate reconnection only once, i.e. on initial exception
                                    // and only if not already trying to connect
                                    LOG.error("Connection to broker failed: " + error.getMessage(), error);
                                    if ( connecting.compareAndSet(false, true) ) {
                                        synchronized ( connectWork ) {
                                            disconnect();
                                            serverSessionPool.closeIdleSessions();
                                            connect();
                            }
                                    } else {
                                        // connection attempt has already been initiated
                                        LOG.info("Connection attempt already in progress, ignoring connection exception");
                        }
                                }
                            }
                    });
                        connection.start();

                        int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
                    if (activationSpec.isDurableSubscription()) {
                            consumer = connection.createDurableConnectionConsumer(
                                    (Topic) dest,
                                    activationSpec.getSubscriptionName(), 
                                    emptyToNull(activationSpec.getMessageSelector()),
                                    serverSessionPool, 
                                    prefetchSize,
                                    activationSpec.getNoLocalBooleanValue());
                    } else {
                            consumer = connection.createConnectionConsumer(
                                    dest, 
                                    emptyToNull(activationSpec.getMessageSelector()), 
                                    serverSessionPool, 
                                    prefetchSize,
                                                                       activationSpec.getNoLocalBooleanValue());
                    }


                        if ( connecting.compareAndSet(true, false) ) {
                            if ( LOG.isInfoEnabled() ) {
                                LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
                            }
                        } else {
                            LOG.error("Could not release connection lock");
                        }
                } catch (JMSException error) {
                        if ( LOG.isDebugEnabled() ) {
                            LOG.debug("Failed to connect: " + error.getMessage(), error);
                }
                        disconnect();
                        pause(error);
            }
                }
            }
            
            private void pause(JMSException error) {
                if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
                    LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 
                            + error.getMessage(), error);
                    LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
                }
                try {
                    synchronized ( shutdownMutex ) {
                        // shutdownMutex will be notified by stop() method in
                        // order to accelerate shutdown of endpoint
                        shutdownMutex.wait(currentReconnectDelay);
                    }
                } catch ( InterruptedException e ) {
                    Thread.interrupted();
                }
                currentReconnectDelay *= 2;
                if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
                    currentReconnectDelay = MAX_RECONNECT_DELAY;
                }                
            }
        };

        MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
        if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
            dest = new ActiveMQQueue(activationSpec.getDestination());
        } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
            dest = new ActiveMQTopic(activationSpec.getDestination());
        } else {
            throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
        }

    }

    /**
     * @param c
     */
    public static void safeClose(Connection c) {
        try {
            if (c != null) {
                LOG.debug("Closing connection to broker");
                c.close();
            }
        } catch (JMSException e) {
            //
        }
    }

    /**
     * @param cc
     */
    public static void safeClose(ConnectionConsumer cc) {
        try {
            if (cc != null) {
                LOG.debug("Closing ConnectionConsumer");
                cc.close();
            }
        } catch (JMSException e) {
            //
        }
    }

    /**
     * 
     */
    public void start() throws ResourceException {
        synchronized (connectWork) {
            if (running)
            return;
        running = true;

            if ( connecting.compareAndSet(false, true) ) {
                LOG.info("Starting");
        serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
        connect();
            } else {
                LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
    }
        }
    }

    /**
     * 
     */
    public void stop() throws InterruptedException {
        synchronized (shutdownMutex) {
            if (!running)
                return;
            running = false;
            LOG.info("Stopping");
            // wake up pausing reconnect attempt
            shutdownMutex.notifyAll();
            serverSessionPool.close();
        }
        disconnect();
    }

    private boolean isRunning() {
        return running;
    }

    private void connect() {
        synchronized ( connectWork ) {
        if (!running) {
            return;
        }

        try {
            workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
        } catch (WorkException e) {
            running = false;
            LOG.error("Work Manager did not accept work: ", e);
        }
    }
    }

    /**
     * 
     */
    private void disconnect() {
        synchronized ( connectWork ) {
        safeClose(consumer);
        consumer = null;
        safeClose(connection);
        connection = null;
    }
            }

    protected void registerThreadSession(Session session) {
        THREAD_LOCAL.set(session);
    }

    protected void unregisterThreadSession(Session session) {
        THREAD_LOCAL.set(null);
    }

    protected ActiveMQConnection getConnection() {
        // make sure we only return a working connection
        // in particular make sure that we do not return null
        // after the resource adapter got disconnected from
        // the broker via the disconnect() method
        synchronized ( connectWork ) {
            return connection;
        }
    }

    private String emptyToNull(String value) {
        if (value == null || value.length() == 0) {
            return null;
        }
        return value;
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ ActiveMQEndpointWorker.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.