|
ActiveMQ example source code file (AmqMessagesQueryFilter.java)
The ActiveMQ AmqMessagesQueryFilter.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.console.filter; import java.net.URI; import java.util.Collections; import java.util.Iterator; import java.util.List; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.QueueBrowser; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; public class AmqMessagesQueryFilter extends AbstractQueryFilter { private URI brokerUrl; private Destination destination; /** * Create a JMS message query filter * * @param brokerUrl - broker url to connect to * @param destination - JMS destination to query */ public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) { super(null); this.brokerUrl = brokerUrl; this.destination = destination; } /** * Queries the specified destination using the message selector format query * * @param queries - message selector queries * @return list messages that matches the selector * @throws Exception */ public List query(List queries) throws Exception { String selector = ""; // Convert to message selector for (Iterator i = queries.iterator(); i.hasNext();) { selector = selector + "(" + i.next().toString() + ") AND "; } // Remove last AND if (!selector.equals("")) { selector = selector.substring(0, selector.length() - 5); } if (destination instanceof ActiveMQQueue) { return queryMessages((ActiveMQQueue)destination, selector); } else { return queryMessages((ActiveMQTopic)destination, selector); } } /** * Query the messages of a queue destination using a queue browser * * @param queue - queue destination * @param selector - message selector * @return list of messages that matches the selector * @throws Exception */ protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception { Connection conn = createConnection(getBrokerUrl()); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); QueueBrowser browser = sess.createBrowser(queue, selector); List messages = Collections.list(browser.getEnumeration()); conn.close(); return messages; } /** * Query the messages of a topic destination using a message consumer * * @param topic - topic destination * @param selector - message selector * @return list of messages that matches the selector * @throws Exception */ protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception { // TODO: should we use a durable subscriber or a retroactive non-durable // subscriber? // TODO: if a durable subscriber is used, how do we manage it? // subscribe/unsubscribe tasks? return null; } /** * Create and start a JMS connection * * @param brokerUrl - broker url to connect to. * @return JMS connection * @throws JMSException */ protected Connection createConnection(URI brokerUrl) throws JMSException { Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection(); conn.start(); return conn; } /** * Get the broker url being used. * * @return broker url */ public URI getBrokerUrl() { return brokerUrl; } /** * Set the broker url to use. * * @param brokerUrl - broker url */ public void setBrokerUrl(URI brokerUrl) { this.brokerUrl = brokerUrl; } /** * Get the destination being used. * * @return - JMS destination */ public Destination getDestination() { return destination; } /** * Set the destination to use. * * @param destination - JMS destination */ public void setDestination(Destination destination) { this.destination = destination; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ AmqMessagesQueryFilter.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.