|
ActiveMQ example source code file (LoggingBrokerPlugin.java)
The ActiveMQ LoggingBrokerPlugin.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.broker.util; import java.util.Set; import javax.annotation.PostConstruct; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.usage.Usage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A simple Broker intercepter which allows you to enable/disable logging. * * @org.apache.xbean.XBean */ public class LoggingBrokerPlugin extends BrokerPluginSupport { private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class); private boolean logAll = false; private boolean logMessageEvents = false; private boolean logConnectionEvents = true; private boolean logTransactionEvents = false; private boolean logConsumerEvents = false; private boolean logProducerEvents = false; private boolean logInternalEvents = false; /** * * @throws Exception * @org.apache.xbean.InitMethod */ @PostConstruct public void afterPropertiesSet() throws Exception { LOG.info("Created LoggingBrokerPlugin: " + this.toString()); } public boolean isLogAll() { return logAll; } /** * Logger all Events that go through the Plugin */ public void setLogAll(boolean logAll) { this.logAll = logAll; } public boolean isLogMessageEvents() { return logMessageEvents; } /** * Logger Events that are related to message processing */ public void setLogMessageEvents(boolean logMessageEvents) { this.logMessageEvents = logMessageEvents; } public boolean isLogConnectionEvents() { return logConnectionEvents; } /** * Logger Events that are related to connections and sessions */ public void setLogConnectionEvents(boolean logConnectionEvents) { this.logConnectionEvents = logConnectionEvents; } public boolean isLogTransactionEvents() { return logTransactionEvents; } /** * Logger Events that are related to transaction processing */ public void setLogTransactionEvents(boolean logTransactionEvents) { this.logTransactionEvents = logTransactionEvents; } public boolean isLogConsumerEvents() { return logConsumerEvents; } /** * Logger Events that are related to Consumers */ public void setLogConsumerEvents(boolean logConsumerEvents) { this.logConsumerEvents = logConsumerEvents; } public boolean isLogProducerEvents() { return logProducerEvents; } /** * Logger Events that are related to Producers */ public void setLogProducerEvents(boolean logProducerEvents) { this.logProducerEvents = logProducerEvents; } public boolean isLogInternalEvents() { return logInternalEvents; } /** * Logger Events that are normally internal to the broker */ public void setLogInternalEvents(boolean logInternalEvents) { this.logInternalEvents = logInternalEvents; } @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId() + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) { LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId() + ", Last Message Id: " + ack.getLastMessageId()); } } super.acknowledge(consumerExchange, ack); } @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName()); } return super.messagePull(context, pull); } @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Adding Connection : " + info); } super.addConnection(context, info); } @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Adding Consumer : " + info); } return super.addConsumer(context, info); } @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { if (isLogAll() || isLogProducerEvents()) { LOG.info("Adding Producer :" + info); } super.addProducer(context, info); } @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Commiting transaction : " + xid.getTransactionKey()); } super.commitTransaction(context, xid, onePhase); } @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Removing subscription : " + info); } super.removeSubscription(context, info); } @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { TransactionId[] result = super.getPreparedTransactions(context); if ((isLogAll() || isLogTransactionEvents()) && result != null) { StringBuffer tids = new StringBuffer(); for (TransactionId tid : result) { if (tids.length() > 0) { tids.append(", "); } tids.append(tid.getTransactionKey()); } LOG.info("Prepared transactions : " + tids); } return result; } @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Preparing transaction : " + xid.getTransactionKey()); } return super.prepareTransaction(context, xid); } @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Removing Connection : " + info); } super.removeConnection(context, info, error); } @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Removing Consumer : " + info); } super.removeConsumer(context, info); } @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { if (isLogAll() || isLogProducerEvents()) { LOG.info("Removing Producer : " + info); } super.removeProducer(context, info); } @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Rolling back Transaction : " + xid.getTransactionKey()); } super.rollbackTransaction(context, xid); } @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { if (isLogAll() || isLogProducerEvents()) { LOG.info("Sending message : " + messageSend.copy()); } super.send(producerExchange, messageSend); } @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Beginning transaction : " + xid.getTransactionKey()); } super.beginTransaction(context, xid); } @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Forgetting transaction : " + transactionId.getTransactionKey()); } super.forgetTransaction(context, transactionId); } @Override public Connection[] getClients() throws Exception { Connection[] result = super.getClients(); if (isLogAll() || isLogInternalEvents()) { if (result == null) { LOG.info("Get Clients returned empty list."); } else { StringBuffer cids = new StringBuffer(); for (Connection c : result) { cids.append(cids.length() > 0 ? ", " : ""); cids.append(c.getConnectionId()); } LOG.info("Connected clients : " + cids); } } return super.getClients(); } @Override public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":" + destination.getPhysicalName()); } return super.addDestination(context, destination, create); } @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":" + destination.getPhysicalName()); } super.removeDestination(context, destination, timeout); } @Override public ActiveMQDestination[] getDestinations() throws Exception { ActiveMQDestination[] result = super.getDestinations(); if (isLogAll() || isLogInternalEvents()) { if (result == null) { LOG.info("Get Destinations returned empty list."); } else { StringBuffer destinations = new StringBuffer(); for (ActiveMQDestination dest : result) { destinations.append(destinations.length() > 0 ? ", " : ""); destinations.append(dest.getPhysicalName()); } LOG.info("Get Destinations : " + destinations); } } return result; } @Override public void start() throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Starting " + getBrokerName()); } super.start(); } @Override public void stop() throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Stopping " + getBrokerName()); } super.stop(); } @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Adding Session : " + info); } super.addSession(context, info); } @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Removing Session : " + info); } super.removeSession(context, info); } @Override public void addBroker(Connection connection, BrokerInfo info) { if (isLogAll() || isLogInternalEvents()) { LOG.info("Adding Broker " + info.getBrokerName()); } super.addBroker(connection, info); } @Override public void removeBroker(Connection connection, BrokerInfo info) { if (isLogAll() || isLogInternalEvents()) { LOG.info("Removing Broker " + info.getBrokerName()); } super.removeBroker(connection, info); } @Override public BrokerInfo[] getPeerBrokerInfos() { BrokerInfo[] result = super.getPeerBrokerInfos(); if (isLogAll() || isLogInternalEvents()) { if (result == null) { LOG.info("Get Peer Broker Infos returned empty list."); } else { StringBuffer peers = new StringBuffer(); for (BrokerInfo bi : result) { peers.append(peers.length() > 0 ? ", " : ""); peers.append(bi.getBrokerName()); } LOG.info("Get Peer Broker Infos : " + peers); } } return result; } @Override public void preProcessDispatch(MessageDispatch messageDispatch) { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { LOG.info("preProcessDispatch :" + messageDispatch); } super.preProcessDispatch(messageDispatch); } @Override public void postProcessDispatch(MessageDispatch messageDispatch) { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { LOG.info("postProcessDispatch :" + messageDispatch); } super.postProcessDispatch(messageDispatch); } @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { LOG.info("ProcessDispatchNotification :" + messageDispatchNotification); } super.processDispatchNotification(messageDispatchNotification); } @Override public Set<ActiveMQDestination> getDurableDestinations() { Set<ActiveMQDestination> result = super.getDurableDestinations(); if (isLogAll() || isLogInternalEvents()) { if (result == null) { LOG.info("Get Durable Destinations returned empty list."); } else { StringBuffer destinations = new StringBuffer(); for (ActiveMQDestination dest : result) { destinations.append(destinations.length() > 0 ? ", " : ""); destinations.append(dest.getPhysicalName()); } LOG.info("Get Durable Destinations : " + destinations); } } return result; } @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Adding destination info : " + info); } super.addDestinationInfo(context, info); } @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Removing destination info : " + info); } super.removeDestinationInfo(context, info); } @Override public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; msg = message.getMessage().toString(); LOG.info("Message has expired : " + msg); } super.messageExpired(context, message, subscription); } @Override public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; msg = messageReference.getMessage().toString(); LOG.info("Sending to DLQ : " + msg); } super.sendToDeadLetterQueue(context, messageReference, subscription); } @Override public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { LOG.info("Fast Producer : " + producerInfo); } super.fastProducer(context, producerInfo); } @Override public void isFull(ConnectionContext context, Destination destination, Usage usage) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { LOG.info("Destination is full : " + destination.getName()); } super.isFull(context, destination, usage); } @Override public void messageConsumed(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { String msg = "Unable to display message."; msg = messageReference.getMessage().toString(); LOG.info("Message consumed : " + msg); } super.messageConsumed(context, messageReference); } @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { String msg = "Unable to display message."; msg = messageReference.getMessage().toString(); LOG.info("Message delivered : " + msg); } super.messageDelivered(context, messageReference); } @Override public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; msg = messageReference.getMessage().toString(); LOG.info("Message discarded : " + msg); } super.messageDiscarded(context, sub, messageReference); } @Override public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { LOG.info("Detected slow consumer on " + destination.getName()); StringBuffer buf = new StringBuffer("Connection("); buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId()); buf.append(") Session("); buf.append(subs.getConsumerInfo().getConsumerId().getSessionId()); buf.append(")"); LOG.info(buf.toString()); } super.slowConsumer(context, destination, subs); } @Override public void nowMasterBroker() { if (isLogAll() || isLogInternalEvents()) { LOG.info("Is now the master broker : " + getBrokerName()); } super.nowMasterBroker(); } @Override public String toString() { StringBuffer buf = new StringBuffer(); buf.append("LoggingBrokerPlugin("); buf.append("logAll="); buf.append(isLogAll()); buf.append(", logConnectionEvents="); buf.append(isLogConnectionEvents()); buf.append(", logConsumerEvents="); buf.append(isLogConsumerEvents()); buf.append(", logProducerEvents="); buf.append(isLogProducerEvents()); buf.append(", logMessageEvents="); buf.append(isLogMessageEvents()); buf.append(", logTransactionEvents="); buf.append(isLogTransactionEvents()); buf.append(", logInternalEvents="); buf.append(isLogInternalEvents()); buf.append(")"); return buf.toString(); } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ LoggingBrokerPlugin.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.