alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (WebClient.java)

This example ActiveMQ source code file (WebClient.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

broker_url_init_param, exception, hashmap, http, io, ioexception, jmsexception, jmsexception, messageconsumer, messageconsumer, request, response, semaphore, servlet, session, string, string, util, webclient, webclient

The ActiveMQ WebClient.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.web;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionActivationListener;
import javax.servlet.http.HttpSessionBindingEvent;
import javax.servlet.http.HttpSessionBindingListener;
import javax.servlet.http.HttpSessionEvent;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.camel.component.ActiveMQConfiguration;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Represents a messaging client used from inside a web container typically
 * stored inside a HttpSession TODO controls to prevent DOS attacks with users
 * requesting many consumers TODO configure consumers with small prefetch.
 * 
 * 
 */
public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {

    public static final String WEB_CLIENT_ATTRIBUTE = "org.apache.activemq.webclient";
    public static final String CONNECTION_FACTORY_ATTRIBUTE = "org.apache.activemq.connectionFactory";
    public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch";
    public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck";
    public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL";
    public static final String SELECTOR_NAME = "org.apache.activemq.selectorName";

    private static final Logger LOG = LoggerFactory.getLogger(WebClient.class);

    private static transient ConnectionFactory factory;

    private transient Map<Destination, MessageConsumer> consumers = new HashMap();
    private transient Connection connection;
    private transient Session session;
    private transient MessageProducer producer;
    private int deliveryMode = DeliveryMode.NON_PERSISTENT;
    public static String selectorName;

    private final Semaphore semaphore = new Semaphore(1);

    private CamelContext camelContext;
    private ProducerTemplate producerTemplate;

    public WebClient() {
        if (factory == null) {
            throw new IllegalStateException("initContext(ServletContext) not called");
        }
    }

    /**
     * Helper method to get the client for the current session, lazily creating
     * a client if there is none currently
     * 
     * @param request is the current HTTP request
     * @return the current client or a newly creates
     */
    public static WebClient getWebClient(HttpServletRequest request) {
        HttpSession session = request.getSession(true);
        WebClient client = getWebClient(session);
        if (client == null || client.isClosed()) {
            client = WebClient.createWebClient(request);
            session.setAttribute(WEB_CLIENT_ATTRIBUTE, client);
        }

        return client;
    }

    /**
     * @return the web client for the current HTTP session or null if there is
     *         not a web client created yet
     */
    public static WebClient getWebClient(HttpSession session) {
        return (WebClient)session.getAttribute(WEB_CLIENT_ATTRIBUTE);
    }

    public static void initContext(ServletContext context) {
        initConnectionFactory(context);
        context.setAttribute("webClients", new HashMap<String, WebClient>());
        if (selectorName == null) {
            selectorName = context.getInitParameter(SELECTOR_NAME);
        }
        if (selectorName == null) {
            selectorName = "selector";
        }        
    }

    public int getDeliveryMode() {
        return deliveryMode;
    }

    public void setDeliveryMode(int deliveryMode) {
        this.deliveryMode = deliveryMode;
    }

    public synchronized void closeConsumers() {
        for (Iterator<MessageConsumer> it = consumers.values().iterator(); it.hasNext();) {
            MessageConsumer consumer = it.next();
            it.remove();
            try {
                consumer.setMessageListener(null);
                if (consumer instanceof MessageAvailableConsumer) {
                    ((MessageAvailableConsumer)consumer).setAvailableListener(null);
                }
                consumer.close();
            } catch (JMSException e) {
                LOG.debug("caught exception closing consumer", e);
            }
        }
    }

    public synchronized void close() {
        try {
            if (consumers != null) {
                closeConsumers();
            }
            if (connection != null) {
                connection.close();
            }
            if (producerTemplate != null) {
            	producerTemplate.stop();
            }
        } catch (Exception e) {
            LOG.debug("caught exception closing consumer", e);
        } finally {
            producer = null;
            session = null;
            connection = null;
            producerTemplate = null;
            if (consumers != null) {
                consumers.clear();
            }
            consumers = null;

        }
    }

    public boolean isClosed() {
        return consumers == null;
    }

    public void writeExternal(ObjectOutput out) throws IOException {
        if (consumers != null) {
            out.write(consumers.size());
            Iterator<Destination> i = consumers.keySet().iterator();
            while (i.hasNext()) {
                out.writeObject(i.next().toString());
            }
        } else {
            out.write(-1);
        }

    }

    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        int size = in.readInt();
        if (size >= 0) {
            consumers = new HashMap<Destination, MessageConsumer>();
            for (int i = 0; i < size; i++) {
                String destinationName = in.readObject().toString();

                try {
                    Destination destination = destinationName.startsWith("topic://") ? (Destination)getSession().createTopic(destinationName) : (Destination)getSession().createQueue(destinationName);
                    consumers.put(destination, getConsumer(destination, null, true));
                } catch (JMSException e) {
                    LOG.debug("Caought Exception ", e);
                    IOException ex = new IOException(e.getMessage());
                    ex.initCause(e.getCause() != null ? e.getCause() : e);
                    throw ex;

                }
            }
        }
    }

    public void send(Destination destination, Message message) throws JMSException {
        getProducer().send(destination, message);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sent! to destination: " + destination + " message: " + message);
        }
    }

    public void send(Destination destination, Message message, boolean persistent, int priority, long timeToLive) throws JMSException {
        int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
        getProducer().send(destination, message, deliveryMode, priority, timeToLive);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sent! to destination: " + destination + " message: " + message);
        }
    }

    public Session getSession() throws JMSException {
        if (session == null) {
            session = createSession();
        }
        return session;
    }

    public Connection getConnection() throws JMSException {
        if (connection == null) {
            connection = factory.createConnection();
            connection.start();
        }
        return connection;
    }

    protected static synchronized void initConnectionFactory(ServletContext servletContext) {
        if (factory == null) {
            factory = (ConnectionFactory)servletContext.getAttribute(CONNECTION_FACTORY_ATTRIBUTE);
        }
        if (factory == null) {
            String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM);

            LOG.debug("Value of: " + BROKER_URL_INIT_PARAM + " is: " + brokerURL);

            if (brokerURL == null) {
                throw new IllegalStateException("missing brokerURL (specified via " + BROKER_URL_INIT_PARAM + " init-Param");
            }

            ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);

            // Set prefetch policy for factory
            if (servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM) != null) {
                int prefetch = Integer.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM)).intValue();
                amqfactory.getPrefetchPolicy().setAll(prefetch);
            }

            // Set optimize acknowledge setting
            if (servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM) != null) {
                boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM)).booleanValue();
                amqfactory.setOptimizeAcknowledge(optimizeAck);
            }

            factory = amqfactory;

            servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
        }
    }
    
    public synchronized CamelContext getCamelContext() {
    	if (camelContext == null) {
    		LOG.debug("Creating camel context");
    		camelContext = new DefaultCamelContext();
    		ActiveMQConfiguration conf = new ActiveMQConfiguration();
    		conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory));
    		ActiveMQComponent component = new ActiveMQComponent(conf);
    		camelContext.addComponent("activemq", component);
    	}
    	return camelContext;
    }
    
    public synchronized ProducerTemplate getProducerTemplate() throws Exception {
    	if (producerTemplate == null) {
    		LOG.debug("Creating producer template");
    		producerTemplate = getCamelContext().createProducerTemplate();
    		producerTemplate.start();
    	}
    	return producerTemplate;
    }

    public synchronized MessageProducer getProducer() throws JMSException {
        if (producer == null) {
            producer = getSession().createProducer(null);
            producer.setDeliveryMode(deliveryMode);
        }
        return producer;
    }

    public void setProducer(MessageProducer producer) {
        this.producer = producer;
    }

    public synchronized MessageConsumer getConsumer(Destination destination, String selector) throws JMSException {
        return getConsumer(destination, selector, true);
    }

    public synchronized MessageConsumer getConsumer(Destination destination, String selector, boolean create) throws JMSException {
        MessageConsumer consumer = consumers.get(destination);
        if (create && consumer == null) {
            consumer = getSession().createConsumer(destination, selector);
            consumers.put(destination, consumer);
        }
        return consumer;
    }

    public synchronized void closeConsumer(Destination destination) throws JMSException {
        MessageConsumer consumer = consumers.get(destination);
        if (consumer != null) {
            consumers.remove(destination);
            consumer.setMessageListener(null);
            if (consumer instanceof MessageAvailableConsumer) {
                ((MessageAvailableConsumer)consumer).setAvailableListener(null);
            }
            consumer.close();
        }
    }

    public synchronized List<MessageConsumer> getConsumers() {
        return new ArrayList<MessageConsumer>(consumers.values());
    }

    protected Session createSession() throws JMSException {
        return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public Semaphore getSemaphore() {
        return semaphore;
    }

    public void sessionWillPassivate(HttpSessionEvent event) {
        close();
    }

    public void sessionDidActivate(HttpSessionEvent event) {
    }

    public void valueBound(HttpSessionBindingEvent event) {
    }

    public void valueUnbound(HttpSessionBindingEvent event) {
        close();
    }

    protected static WebClient createWebClient(HttpServletRequest request) {
        return new WebClient();
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ WebClient.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.