alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Axis 2 example source code file (JMSListener.java)

This example Axis 2 source code file (JMSListener.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Axis 2 tags/keywords

axisfault, axisfault, endpointreference, error, iterator, jms, jms, jmsconnectionfactory, jmsconnectionfactory, jmsexception, naming, parameter, string, string, this, util

The Axis 2 JMSListener.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.axis2.transport.jms;

import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.SessionContext;
import org.apache.axis2.description.*;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEvent;
import org.apache.axis2.engine.AxisObserver;
import org.apache.axis2.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.NamingException;
import java.util.*;

/**
 * The JMS Transport listener implementation. A JMS Listner will hold one or
 * more JMS connection factories, which would be created at initialization
 * time. This implementation does not support the creation of connection
 * factories at runtime. This JMS Listener registers with Axis to be notified
 * of service deployment/undeployment/start and stop, and enables or disables
 * listening for messages on the destinations as appropriate.
 * <p/>
 * A Service could state the JMS connection factory name and the destination
 * name for use as Parameters in its services.xml as shown in the example
 * below. If the connection name was not specified, it will use the connection
 * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a
 * factory is defined in the Axis2.xml. If the destination name is not specified
 * it will default to a JMS queue by the name of the service. If the destination
 * should be a Topic, it should be created on the JMS implementation, and
 * specified in the services.xml of the service.
 * <p/>
 * <parameter name="transport.jms.ConnectionFactory" locked="true">
 * myTopicConnectionFactory</parameter>
 * <parameter name="transport.jms.Destination" locked="true">
 * dynamicTopics/something.TestTopic</parameter>
 */
public class JMSListener implements TransportListener {

    private static final Log log = LogFactory.getLog(JMSListener.class);

    /**
     * The maximum number of threads used for the worker thread pool
     */
    private static final int WORKERS_MAX_THREADS = 100;
    /**
     * The keep alive time of an idle worker thread
     */
    private static final long WORKER_KEEP_ALIVE = 60L;
    /**
     * The worker thread timeout time unit
     */
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    /**
     * A Map containing the connection factories managed by this, keyed by name
     */
    private Map connectionFactories = new HashMap();
    /**
     * A Map of service name to the JMS EPR addresses
     */
    private Map serviceNameToEprMap = new HashMap();
    /**
     * The Axis2 Configuration context
     */
    private ConfigurationContext configCtx = null;

    /**
     * This is the TransportListener initialization method invoked by Axis2
     *
     * @param axisConf   the Axis configuration context
     * @param transprtIn the TransportIn description
     */
    public void init(ConfigurationContext axisConf,
                     TransportInDescription transprtIn) {

        // save reference to the configuration context
        this.configCtx = axisConf;

        // initialize the defined connection factories
        initializeConnectionFactories(transprtIn);

        // if no connection factories are defined, we cannot listen
        if (connectionFactories.isEmpty()) {
            log.warn("No JMS connection factories are defined." +
                     "Will not listen for any JMS messages");
            return;
        }

        // iterate through deployed services and validate connection factory
        // names, and mark services as faulty where appropriate.
        Iterator services =
                axisConf.getAxisConfiguration().getServices().values().iterator();

        while (services.hasNext()) {
            AxisService service = (AxisService) services.next();
            if (JMSUtils.isJMSService(service)) {
                processService(service);
            }
        }

        // register to receive updates on services for lifetime management
        axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver());

        log.info("JMS Transport Receiver (Listener) initialized...");
    }


    /**
     * Prepare to listen for JMS messages on behalf of this service
     *
     * @param service
     */
    private void processService(AxisService service) {
        JMSConnectionFactory cf = getConnectionFactory(service);
        if (cf == null) {
            String msg = "Service " + service.getName() + " does not specify" +
                         "a JMS connection factory or refers to an invalid factory. " +
                         "This service is being marked as faulty and will not be " +
                         "available over the JMS transport";
            log.warn(msg);
            JMSUtils.markServiceAsFaulty(
                    service.getName(), msg, service.getAxisConfiguration());
            return;
        }

        String destination = JMSUtils.getDestination(service);

        // compute service EPR and keep for later use
        serviceNameToEprMap.put(service.getName(), getEPR(cf, destination));

        // add the specified or implicit destination of this service
        // to its connection factory
        cf.addDestination(destination, service.getName());
    }

    /**
     * Return the connection factory name for this service. If this service
     * refers to an invalid factory or defaults to a non-existent default
     * factory, this returns null
     *
     * @param service the AxisService
     * @return the JMSConnectionFactory to be used, or null if reference is invalid
     */
    private JMSConnectionFactory getConnectionFactory(AxisService service) {
        Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM);

        // validate connection factory name (specified or default)
        if (conFacParam != null) {
            String conFac = (String) conFacParam.getValue();
            if (connectionFactories.containsKey(conFac)) {
                return (JMSConnectionFactory) connectionFactories.get(conFac);
            } else {
                return null;
            }

        } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) {
            return (JMSConnectionFactory) connectionFactories.
                    get(JMSConstants.DEFAULT_CONFAC_NAME);

        } else {
            return null;
        }
    }

    /**
     * Initialize the defined connection factories, parsing the TransportIn
     * descriptions
     *
     * @param transprtIn The Axis2 Transport in for the JMS
     */
    private void initializeConnectionFactories(TransportInDescription transprtIn) {
        // iterate through all defined connection factories
        Iterator conFacIter = transprtIn.getParameters().iterator();

        while (conFacIter.hasNext()) {

            Parameter param = (Parameter) conFacIter.next();
            JMSConnectionFactory jmsConFactory =
                    new JMSConnectionFactory(param.getName());

            ParameterIncludeImpl pi = new ParameterIncludeImpl();
            try {
                pi.deserializeParameters((OMElement) param.getValue());
            } catch (AxisFault axisFault) {
                handleException("Error reading Parameters for JMS connection " +
                                "factory" + jmsConFactory.getName(), axisFault);
            }

            // read connection facotry properties
            Iterator params = pi.getParameters().iterator();

            while (params.hasNext()) {
                Parameter p = (Parameter) params.next();

                if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
                    jmsConFactory.addProperty(
                            Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue());
                } else if (Context.PROVIDER_URL.equals(p.getName())) {
                    jmsConFactory.addProperty(
                            Context.PROVIDER_URL, (String) p.getValue());
                } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
                    jmsConFactory.addProperty(
                            Context.SECURITY_PRINCIPAL, (String) p.getValue());
                } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
                    jmsConFactory.addProperty(
                            Context.SECURITY_CREDENTIALS, (String) p.getValue());
                } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
                    jmsConFactory.setJndiName((String) p.getValue());
                } else if (JMSConstants.CONFAC_JNDI_NAME_USER.equals(p.getName())) {
                    jmsConFactory.setJndiUser((String) p.getValue());
                } else if (JMSConstants.CONFAC_JNDI_NAME_PASS.equals(p.getName())) {
                    jmsConFactory.setJndiPass((String) p.getValue());
                } else if (JMSConstants.DEST_PARAM.equals(p.getName())) {
                    StringTokenizer st =
                            new StringTokenizer((String) p.getValue(), " ,");
                    while (st.hasMoreTokens()) {
                        jmsConFactory.addDestination(st.nextToken(), null);
                    }
                }
            }

            // connect to the actual connection factory
            try {
                jmsConFactory.connect();
                connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
            } catch (NamingException e) {
                handleException("Error connecting to JMS connection factory : " +
                                jmsConFactory.getJndiName(), e);
            }
        }
    }

    /**
     * Get the EPR for the given JMS connection factory and destination
     * the form of the URL is
     * jms:/<destination>?[=&]*
     *
     * @param cf          the Axis2 JMS connection factory
     * @param destination the JNDI name of the destination
     * @return the EPR as a String
     */
    private static String getEPR(JMSConnectionFactory cf, String destination) {
        StringBuffer sb = new StringBuffer();
        sb.append(JMSConstants.JMS_PREFIX).append(destination);
        sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM).
                append("=").append(cf.getJndiName());
        Iterator props = cf.getProperties().keySet().iterator();
        while (props.hasNext()) {
            String key = (String) props.next();
            String value = (String) cf.getProperties().get(key);
            sb.append("&").append(key).append("=").append(value);
        }
        return sb.toString();
    }

    /**
     * Start this JMS Listener (Transport Listener)
     *
     * @throws AxisFault
     */
    public void start() throws AxisFault {
        // create thread pool of workers
        ExecutorService workerPool = new ThreadPoolExecutor(
                1,
                WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT,
                new LinkedBlockingQueue(),
                new org.apache.axis2.util.threadpool.DefaultThreadFactory(
                        new ThreadGroup("JMS Worker thread group"),
                        "JMSWorker"));

        Iterator iter = connectionFactories.values().iterator();
        while (iter.hasNext()) {
            JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next();
            JMSMessageReceiver msgRcvr =
                    new JMSMessageReceiver(conFac, workerPool, configCtx);

            try {
                conFac.listen(msgRcvr);
            } catch (JMSException e) {
                handleException("Error starting connection factory : " +
                                conFac.getName(), e);
            }
        }
    }

    /**
     * Stop this transport listener and shutdown all of the connection factories
     */
    public void stop() {
        Iterator iter = connectionFactories.values().iterator();
        while (iter.hasNext()) {
            ((JMSConnectionFactory) iter.next()).stop();
        }
    }

    /**
     * Returns EPRs for the given service and IP. (Picks up precomputed EPR)
     *
     * @param serviceName service name
     * @param ip          ignored
     * @return the EPR for the service
     * @throws AxisFault not used
     */
    public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
        //Strip out the operation name
        if (serviceName.indexOf('/') != -1) {
            serviceName = serviceName.substring(0, serviceName.indexOf('/'));
        }
        return new EndpointReference[]{
                new EndpointReference((String) serviceNameToEprMap.get(serviceName))};
    }

    /**
     * Returns the EPR for the given service and IP. (Picks up precomputed EPR)
     *
     * @param serviceName service name
     * @param ip          ignored
     * @return the EPR for the service
     * @throws AxisFault not used
     */
    public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault {
        return getEPRsForService(serviceName, ip)[0];
    }

    /**
     * Starts listening for messages on this service
     *
     * @param service the AxisService just deployed
     */
    private void startListeningForService(AxisService service) {
        processService(service);
        JMSConnectionFactory cf = getConnectionFactory(service);
        if (cf == null) {
            String msg = "Service " + service.getName() + " does not specify" +
                         "a JMS connection factory or refers to an invalid factory." +
                         "This service is being marked as faulty and will not be " +
                         "available over the JMS transport";
            log.warn(msg);
            JMSUtils.markServiceAsFaulty(
                    service.getName(), msg, service.getAxisConfiguration());
            return;
        }

        String destination = JMSUtils.getDestination(service);
        try {
            cf.listenOnDestination(destination);
            log.info("Started listening on destination : " + destination +
                     " for service " + service.getName());

        } catch (JMSException e) {
            handleException(
                    "Could not listen on JMS for service " + service.getName(), e);
            JMSUtils.markServiceAsFaulty(
                    service.getName(), e.getMessage(), service.getAxisConfiguration());
        }
    }

    /**
     * Stops listening for messages for the service undeployed
     *
     * @param service the AxisService just undeployed
     */
    private void stopListeningForService(AxisService service) {

        JMSConnectionFactory cf = getConnectionFactory(service);
        if (cf == null) {
            String msg = "Service " + service.getName() + " does not specify" +
                         "a JMS connection factory or refers to an invalid factory." +
                         "This service is being marked as faulty and will not be " +
                         "available over the JMS transport";
            log.warn(msg);
            JMSUtils.markServiceAsFaulty(
                    service.getName(), msg, service.getAxisConfiguration());
            return;
        }

        // remove from the serviceNameToEprMap
        serviceNameToEprMap.remove(service.getName());

        String destination = JMSUtils.getDestination(service);
        try {
            cf.removeDestination(destination);
        } catch (JMSException e) {
            handleException(
                    "Error while terminating listening on JMS destination : " + destination, e);
        }
    }

    private void handleException(String msg, Exception e) {
        log.error(msg, e);
        throw new AxisJMSException(msg, e);
    }

    /**
     * An AxisObserver which will start listening for newly deployed services,
     * and stop listening when services are undeployed.
     */
    class JMSAxisObserver implements AxisObserver {

        // The initilization code will go here
        public void init(AxisConfiguration axisConfig) {
        }

        public void serviceUpdate(AxisEvent event, AxisService service) {

            if (JMSUtils.isJMSService(service)) {
                switch (event.getEventType()) {
                    case AxisEvent.SERVICE_DEPLOY :
                        startListeningForService(service);
                        break;
                    case AxisEvent.SERVICE_REMOVE :
                        stopListeningForService(service);
                        break;
                    case AxisEvent.SERVICE_START  :
                        startListeningForService(service);
                        break;
                    case AxisEvent.SERVICE_STOP   :
                        stopListeningForService(service);
                        break;
                }
            }
        }

        public void moduleUpdate(AxisEvent event, AxisModule module) {
        }

        //--------------------------------------------------------
        public void addParameter(Parameter param) throws AxisFault {
        }

        public void removeParameter(Parameter param) throws AxisFault {
        }

        public void deserializeParameters(OMElement parameterElement) throws AxisFault {
        }

        public Parameter getParameter(String name) {
            return null;
        }

        public ArrayList getParameters() {
            return null;
        }

        public boolean isParameterLocked(String parameterName) {
            return false;
        }

        public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) {
        }
    }

    public ConfigurationContext getConfigurationContext() {
        return this.configCtx;
    }


    public SessionContext getSessionContext(MessageContext messageContext) {
        return null;
    }

    public void destroy() {
        this.configCtx = null;
    }
}

Other Axis 2 examples (source code examples)

Here is a short list of links related to this Axis 2 JMSListener.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.