alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (LoggingBrokerPlugin.java)

This example ActiveMQ source code file (LoggingBrokerPlugin.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqdestination, adding, annotation, connection, exception, exception, get, message, message, override, override, removing, stringbuffer, stringbuffer, transactionid, util

The ActiveMQ LoggingBrokerPlugin.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.util;

import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.usage.Usage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A simple Broker intercepter which allows you to enable/disable logging.
 * 
 * @org.apache.xbean.XBean
 */

public class LoggingBrokerPlugin extends BrokerPluginSupport {

    private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);

    private boolean logAll = false;
    private boolean logMessageEvents = false;
    private boolean logConnectionEvents = true;
    private boolean logTransactionEvents = false;
    private boolean logConsumerEvents = false;
    private boolean logProducerEvents = false;
    private boolean logInternalEvents = false;

    /**
     * 
     * @throws Exception
     * @org.apache.xbean.InitMethod
     */
    @PostConstruct
    public void afterPropertiesSet() throws Exception {
        LOG.info("Created LoggingBrokerPlugin: " + this.toString());
    }

    public boolean isLogAll() {
        return logAll;
    }

    /**
     * Logger all Events that go through the Plugin
     */
    public void setLogAll(boolean logAll) {
        this.logAll = logAll;
    }

    public boolean isLogMessageEvents() {
        return logMessageEvents;
    }

    /**
     * Logger Events that are related to message processing
     */
    public void setLogMessageEvents(boolean logMessageEvents) {
        this.logMessageEvents = logMessageEvents;
    }

    public boolean isLogConnectionEvents() {
        return logConnectionEvents;
    }

    /**
     * Logger Events that are related to connections and sessions
     */
    public void setLogConnectionEvents(boolean logConnectionEvents) {
        this.logConnectionEvents = logConnectionEvents;
    }

    public boolean isLogTransactionEvents() {
        return logTransactionEvents;
    }

    /**
     * Logger Events that are related to transaction processing
     */
    public void setLogTransactionEvents(boolean logTransactionEvents) {
        this.logTransactionEvents = logTransactionEvents;
    }

    public boolean isLogConsumerEvents() {
        return logConsumerEvents;
    }

    /**
     * Logger Events that are related to Consumers
     */
    public void setLogConsumerEvents(boolean logConsumerEvents) {
        this.logConsumerEvents = logConsumerEvents;
    }

    public boolean isLogProducerEvents() {
        return logProducerEvents;
    }

    /**
     * Logger Events that are related to Producers
     */
    public void setLogProducerEvents(boolean logProducerEvents) {
        this.logProducerEvents = logProducerEvents;
    }

    public boolean isLogInternalEvents() {
        return logInternalEvents;
    }

    /**
     * Logger Events that are normally internal to the broker
     */
    public void setLogInternalEvents(boolean logInternalEvents) {
        this.logInternalEvents = logInternalEvents;
    }

    @Override
    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
        if (isLogAll() || isLogConsumerEvents()) {
            LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
                    + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
            if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
                LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
                        + ", Last Message Id: " + ack.getLastMessageId());
            }
        }
        super.acknowledge(consumerExchange, ack);
    }

    @Override
    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
        if (isLogAll() || isLogConsumerEvents()) {
            LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
        }
        return super.messagePull(context, pull);
    }

    @Override
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        if (isLogAll() || isLogConnectionEvents()) {
            LOG.info("Adding Connection : " + info);
        }
        super.addConnection(context, info);
    }

    @Override
    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
        if (isLogAll() || isLogConsumerEvents()) {
            LOG.info("Adding Consumer : " + info);
        }
        return super.addConsumer(context, info);
    }

    @Override
    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
        if (isLogAll() || isLogProducerEvents()) {
            LOG.info("Adding Producer :" + info);
        }
        super.addProducer(context, info);
    }

    @Override
    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
        if (isLogAll() || isLogTransactionEvents()) {
            LOG.info("Commiting transaction : " + xid.getTransactionKey());
        }
        super.commitTransaction(context, xid, onePhase);
    }

    @Override
    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
        if (isLogAll() || isLogConsumerEvents()) {
            LOG.info("Removing subscription : " + info);
        }
        super.removeSubscription(context, info);
    }

    @Override
    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {

        TransactionId[] result = super.getPreparedTransactions(context);
        if ((isLogAll() || isLogTransactionEvents()) && result != null) {
            StringBuffer tids = new StringBuffer();
            for (TransactionId tid : result) {
                if (tids.length() > 0) {
                    tids.append(", ");
                }
                tids.append(tid.getTransactionKey());
            }
            LOG.info("Prepared transactions : " + tids);
        }
        return result;
    }

    @Override
    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        if (isLogAll() || isLogTransactionEvents()) {
            LOG.info("Preparing transaction : " + xid.getTransactionKey());
        }
        return super.prepareTransaction(context, xid);
    }

    @Override
    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
        if (isLogAll() || isLogConnectionEvents()) {
            LOG.info("Removing Connection : " + info);
        }
        super.removeConnection(context, info, error);
    }

    @Override
    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
        if (isLogAll() || isLogConsumerEvents()) {
            LOG.info("Removing Consumer : " + info);
        }
        super.removeConsumer(context, info);
    }

    @Override
    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
        if (isLogAll() || isLogProducerEvents()) {
            LOG.info("Removing Producer : " + info);
        }
        super.removeProducer(context, info);
    }

    @Override
    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        if (isLogAll() || isLogTransactionEvents()) {
            LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
        }
        super.rollbackTransaction(context, xid);
    }

    @Override
    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
        if (isLogAll() || isLogProducerEvents()) {
            LOG.info("Sending message : " + messageSend.copy());
        }
        super.send(producerExchange, messageSend);
    }

    @Override
    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        if (isLogAll() || isLogTransactionEvents()) {
            LOG.info("Beginning transaction : " + xid.getTransactionKey());
        }
        super.beginTransaction(context, xid);
    }

    @Override
    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
        if (isLogAll() || isLogTransactionEvents()) {
            LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
        }
        super.forgetTransaction(context, transactionId);
    }

    @Override
    public Connection[] getClients() throws Exception {
        Connection[] result = super.getClients();

        if (isLogAll() || isLogInternalEvents()) {
            if (result == null) {
                LOG.info("Get Clients returned empty list.");
            } else {
                StringBuffer cids = new StringBuffer();
                for (Connection c : result) {
                    cids.append(cids.length() > 0 ? ", " : "");
                    cids.append(c.getConnectionId());
                }
                LOG.info("Connected clients : " + cids);
            }
        }
        return super.getClients();
    }

    @Override
    public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
            ActiveMQDestination destination, boolean create) throws Exception {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
                    + destination.getPhysicalName());
        }
        return super.addDestination(context, destination, create);
    }

    @Override
    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
            throws Exception {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
                    + destination.getPhysicalName());
        }
        super.removeDestination(context, destination, timeout);
    }

    @Override
    public ActiveMQDestination[] getDestinations() throws Exception {
        ActiveMQDestination[] result = super.getDestinations();
        if (isLogAll() || isLogInternalEvents()) {
            if (result == null) {
                LOG.info("Get Destinations returned empty list.");
            } else {
                StringBuffer destinations = new StringBuffer();
                for (ActiveMQDestination dest : result) {
                    destinations.append(destinations.length() > 0 ? ", " : "");
                    destinations.append(dest.getPhysicalName());
                }
                LOG.info("Get Destinations : " + destinations);
            }
        }
        return result;
    }

    @Override
    public void start() throws Exception {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Starting " + getBrokerName());
        }
        super.start();
    }

    @Override
    public void stop() throws Exception {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Stopping " + getBrokerName());
        }
        super.stop();
    }

    @Override
    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
        if (isLogAll() || isLogConnectionEvents()) {
            LOG.info("Adding Session : " + info);
        }
        super.addSession(context, info);
    }

    @Override
    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
        if (isLogAll() || isLogConnectionEvents()) {
            LOG.info("Removing Session : " + info);
        }
        super.removeSession(context, info);
    }

    @Override
    public void addBroker(Connection connection, BrokerInfo info) {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Adding Broker " + info.getBrokerName());
        }
        super.addBroker(connection, info);
    }

    @Override
    public void removeBroker(Connection connection, BrokerInfo info) {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Removing Broker " + info.getBrokerName());
        }
        super.removeBroker(connection, info);
    }

    @Override
    public BrokerInfo[] getPeerBrokerInfos() {
        BrokerInfo[] result = super.getPeerBrokerInfos();
        if (isLogAll() || isLogInternalEvents()) {
            if (result == null) {
                LOG.info("Get Peer Broker Infos returned empty list.");
            } else {
                StringBuffer peers = new StringBuffer();
                for (BrokerInfo bi : result) {
                    peers.append(peers.length() > 0 ? ", " : "");
                    peers.append(bi.getBrokerName());
                }
                LOG.info("Get Peer Broker Infos : " + peers);
            }
        }
        return result;
    }

    @Override
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
            LOG.info("preProcessDispatch :" + messageDispatch);
        }
        super.preProcessDispatch(messageDispatch);
    }

    @Override
    public void postProcessDispatch(MessageDispatch messageDispatch) {
        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
            LOG.info("postProcessDispatch :" + messageDispatch);
        }
        super.postProcessDispatch(messageDispatch);
    }

    @Override
    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
            LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
        }
        super.processDispatchNotification(messageDispatchNotification);
    }

    @Override
    public Set<ActiveMQDestination> getDurableDestinations() {
        Set<ActiveMQDestination> result = super.getDurableDestinations();
        if (isLogAll() || isLogInternalEvents()) {
            if (result == null) {
                LOG.info("Get Durable Destinations returned empty list.");
            } else {
                StringBuffer destinations = new StringBuffer();
                for (ActiveMQDestination dest : result) {
                    destinations.append(destinations.length() > 0 ? ", " : "");
                    destinations.append(dest.getPhysicalName());
                }
                LOG.info("Get Durable Destinations : " + destinations);
            }
        }
        return result;
    }

    @Override
    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Adding destination info : " + info);
        }
        super.addDestinationInfo(context, info);
    }

    @Override
    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Removing destination info : " + info);
        }
        super.removeDestinationInfo(context, info);
    }

    @Override
    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
        if (isLogAll() || isLogInternalEvents()) {
            String msg = "Unable to display message.";

            msg = message.getMessage().toString();

            LOG.info("Message has expired : " + msg);
        }
        super.messageExpired(context, message, subscription);
    }

    @Override
    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
                                      Subscription subscription) {
        if (isLogAll() || isLogInternalEvents()) {
            String msg = "Unable to display message.";

            msg = messageReference.getMessage().toString();

            LOG.info("Sending to DLQ : " + msg);
        }
        super.sendToDeadLetterQueue(context, messageReference, subscription);
    }

    @Override
    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
            LOG.info("Fast Producer : " + producerInfo);
        }
        super.fastProducer(context, producerInfo);
    }

    @Override
    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
            LOG.info("Destination is full : " + destination.getName());
        }
        super.isFull(context, destination, usage);
    }

    @Override
    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
            String msg = "Unable to display message.";

            msg = messageReference.getMessage().toString();

            LOG.info("Message consumed : " + msg);
        }
        super.messageConsumed(context, messageReference);
    }

    @Override
    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
            String msg = "Unable to display message.";

            msg = messageReference.getMessage().toString();

            LOG.info("Message delivered : " + msg);
        }
        super.messageDelivered(context, messageReference);
    }

    @Override
    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
        if (isLogAll() || isLogInternalEvents()) {
            String msg = "Unable to display message.";

            msg = messageReference.getMessage().toString();

            LOG.info("Message discarded : " + msg);
        }
        super.messageDiscarded(context, sub, messageReference);
    }

    @Override
    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
            LOG.info("Detected slow consumer on " + destination.getName());
            StringBuffer buf = new StringBuffer("Connection(");
            buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
            buf.append(") Session(");
            buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
            buf.append(")");
            LOG.info(buf.toString());
        }
        super.slowConsumer(context, destination, subs);
    }

    @Override
    public void nowMasterBroker() {
        if (isLogAll() || isLogInternalEvents()) {
            LOG.info("Is now the master broker : " + getBrokerName());
        }
        super.nowMasterBroker();
    }

    @Override
    public String toString() {
        StringBuffer buf = new StringBuffer();
        buf.append("LoggingBrokerPlugin(");
        buf.append("logAll=");
        buf.append(isLogAll());
        buf.append(", logConnectionEvents=");
        buf.append(isLogConnectionEvents());
        buf.append(", logConsumerEvents=");
        buf.append(isLogConsumerEvents());
        buf.append(", logProducerEvents=");
        buf.append(isLogProducerEvents());
        buf.append(", logMessageEvents=");
        buf.append(isLogMessageEvents());
        buf.append(", logTransactionEvents=");
        buf.append(isLogTransactionEvents());
        buf.append(", logInternalEvents=");
        buf.append(isLogInternalEvents());
        buf.append(")");
        return buf.toString();
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ LoggingBrokerPlugin.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.