|
ActiveMQ example source code file (BaseDestination.java)
The ActiveMQ BaseDestination.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.broker.region; import java.io.IOException; import javax.jms.ResourceAllocationException; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.slf4j.Logger; /** * */ public abstract class BaseDestination implements Destination { /** * The maximum number of messages to page in to the destination from * persistent storage */ public static final int MAX_PAGE_SIZE = 200; public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; public static final int MAX_PRODUCERS_TO_AUDIT = 64; public static final int MAX_AUDIT_DEPTH = 2048; protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; protected SystemUsage systemUsage; protected MemoryUsage memoryUsage; private boolean producerFlowControl = true; protected boolean warnOnProducerFlowControl = true; protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; private int maxProducersToAudit = 1024; private int maxAuditDepth = 2048; private boolean enableAudit = true; private int maxPageSize = MAX_PAGE_SIZE; private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; private boolean useCache = true; private int minimumMessageSize = 1024; private boolean lazyDispatch = false; private boolean advisoryForSlowConsumers; private boolean advisdoryForFastProducers; private boolean advisoryForDiscardingMessages; private boolean advisoryWhenFull; private boolean advisoryForDelivery; private boolean advisoryForConsumed; private boolean sendAdvisoryIfNoConsumers; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final BrokerService brokerService; protected final Broker regionBroker; protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; protected int cursorMemoryHighWaterMark = 70; protected int storeUsageHighWaterMark = 100; private SlowConsumerStrategy slowConsumerStrategy; private boolean prioritizedMessages; private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean gcIfInactive; private long lastActiveTime=0l; private boolean reduceMemoryFootprint = false; /** * @param broker * @param store * @param destination * @param parentStats * @throws Exception */ public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { this.brokerService = brokerService; this.broker = brokerService.getBroker(); this.store = store; this.destination = destination; // let's copy the enabled property from the parent DestinationStatistics this.destinationStatistics.setEnabled(parentStats.isEnabled()); this.destinationStatistics.setParent(parentStats); this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); this.memoryUsage = this.systemUsage.getMemoryUsage(); this.memoryUsage.setUsagePortion(1.0f); this.regionBroker = brokerService.getRegionBroker(); } /** * initialize the destination * * @throws Exception */ public void initialize() throws Exception { // Let the store know what usage manager we are using so that he can // flush messages to disk when usage gets high. if (store != null) { store.setMemoryUsage(this.memoryUsage); } } /** * @return the producerFlowControl */ public boolean isProducerFlowControl() { return producerFlowControl; } /** * @param producerFlowControl the producerFlowControl to set */ public void setProducerFlowControl(boolean producerFlowControl) { this.producerFlowControl = producerFlowControl; } /** * Set's the interval at which warnings about producers being blocked by * resource usage will be triggered. Values of 0 or less will disable * warnings * * @param blockedProducerWarningInterval the interval at which warning about * blocked producers will be triggered. */ public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { this.blockedProducerWarningInterval = blockedProducerWarningInterval; } /** * * @return the interval at which warning about blocked producers will be * triggered. */ public long getBlockedProducerWarningInterval() { return blockedProducerWarningInterval; } /** * @return the maxProducersToAudit */ public int getMaxProducersToAudit() { return maxProducersToAudit; } /** * @param maxProducersToAudit the maxProducersToAudit to set */ public void setMaxProducersToAudit(int maxProducersToAudit) { this.maxProducersToAudit = maxProducersToAudit; } /** * @return the maxAuditDepth */ public int getMaxAuditDepth() { return maxAuditDepth; } /** * @param maxAuditDepth the maxAuditDepth to set */ public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; } /** * @return the enableAudit */ public boolean isEnableAudit() { return enableAudit; } /** * @param enableAudit the enableAudit to set */ public void setEnableAudit(boolean enableAudit) { this.enableAudit = enableAudit; } public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { destinationStatistics.getProducers().increment(); this.lastActiveTime=0l; } public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { destinationStatistics.getProducers().decrement(); } public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ destinationStatistics.getConsumers().increment(); this.lastActiveTime=0l; } public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ destinationStatistics.getConsumers().decrement(); } public final MemoryUsage getMemoryUsage() { return memoryUsage; } public DestinationStatistics getDestinationStatistics() { return destinationStatistics; } public ActiveMQDestination getActiveMQDestination() { return destination; } public final String getName() { return getActiveMQDestination().getPhysicalName(); } public final MessageStore getMessageStore() { return store; } public final boolean isActive() { return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; } public int getMaxPageSize() { return maxPageSize; } public void setMaxPageSize(int maxPageSize) { this.maxPageSize = maxPageSize; } public int getMaxBrowsePageSize() { return this.maxBrowsePageSize; } public void setMaxBrowsePageSize(int maxPageSize) { this.maxBrowsePageSize = maxPageSize; } public int getMaxExpirePageSize() { return this.maxExpirePageSize; } public void setMaxExpirePageSize(int maxPageSize) { this.maxExpirePageSize = maxPageSize; } public void setExpireMessagesPeriod(long expireMessagesPeriod) { this.expireMessagesPeriod = expireMessagesPeriod; } public long getExpireMessagesPeriod() { return expireMessagesPeriod; } public boolean isUseCache() { return useCache; } public void setUseCache(boolean useCache) { this.useCache = useCache; } public int getMinimumMessageSize() { return minimumMessageSize; } public void setMinimumMessageSize(int minimumMessageSize) { this.minimumMessageSize = minimumMessageSize; } public boolean isLazyDispatch() { return lazyDispatch; } public void setLazyDispatch(boolean lazyDispatch) { this.lazyDispatch = lazyDispatch; } protected long getDestinationSequenceId() { return regionBroker.getBrokerSequenceId(); } /** * @return the advisoryForSlowConsumers */ public boolean isAdvisoryForSlowConsumers() { return advisoryForSlowConsumers; } /** * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set */ public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { this.advisoryForSlowConsumers = advisoryForSlowConsumers; } /** * @return the advisoryForDiscardingMessages */ public boolean isAdvisoryForDiscardingMessages() { return advisoryForDiscardingMessages; } /** * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to * set */ public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; } /** * @return the advisoryWhenFull */ public boolean isAdvisoryWhenFull() { return advisoryWhenFull; } /** * @param advisoryWhenFull the advisoryWhenFull to set */ public void setAdvisoryWhenFull(boolean advisoryWhenFull) { this.advisoryWhenFull = advisoryWhenFull; } /** * @return the advisoryForDelivery */ public boolean isAdvisoryForDelivery() { return advisoryForDelivery; } /** * @param advisoryForDelivery the advisoryForDelivery to set */ public void setAdvisoryForDelivery(boolean advisoryForDelivery) { this.advisoryForDelivery = advisoryForDelivery; } /** * @return the advisoryForConsumed */ public boolean isAdvisoryForConsumed() { return advisoryForConsumed; } /** * @param advisoryForConsumed the advisoryForConsumed to set */ public void setAdvisoryForConsumed(boolean advisoryForConsumed) { this.advisoryForConsumed = advisoryForConsumed; } /** * @return the advisdoryForFastProducers */ public boolean isAdvisdoryForFastProducers() { return advisdoryForFastProducers; } /** * @param advisdoryForFastProducers the advisdoryForFastProducers to set */ public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { this.advisdoryForFastProducers = advisdoryForFastProducers; } public boolean isSendAdvisoryIfNoConsumers() { return sendAdvisoryIfNoConsumers; } public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } /** * @return the dead letter strategy */ public DeadLetterStrategy getDeadLetterStrategy() { return deadLetterStrategy; } /** * set the dead letter strategy * * @param deadLetterStrategy */ public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { this.deadLetterStrategy = deadLetterStrategy; } public int getCursorMemoryHighWaterMark() { return this.cursorMemoryHighWaterMark; } public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; } /** * called when message is consumed * * @param context * @param messageReference */ public void messageConsumed(ConnectionContext context, MessageReference messageReference) { if (advisoryForConsumed) { broker.messageConsumed(context, messageReference); } } /** * Called when message is delivered to the broker * * @param context * @param messageReference */ public void messageDelivered(ConnectionContext context, MessageReference messageReference) { if (advisoryForDelivery) { broker.messageDelivered(context, messageReference); } } /** * Called when a message is discarded - e.g. running low on memory This will * happen only if the policy is enabled - e.g. non durable topics * * @param context * @param messageReference */ public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { if (advisoryForDiscardingMessages) { broker.messageDiscarded(context, sub, messageReference); } } /** * Called when there is a slow consumer * * @param context * @param subs */ public void slowConsumer(ConnectionContext context, Subscription subs) { if (advisoryForSlowConsumers) { broker.slowConsumer(context, this, subs); } if (slowConsumerStrategy != null) { slowConsumerStrategy.slowConsumer(context, subs); } } /** * Called to notify a producer is too fast * * @param context * @param producerInfo */ public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { if (advisdoryForFastProducers) { broker.fastProducer(context, producerInfo); } } /** * Called when a Usage reaches a limit * * @param context * @param usage */ public void isFull(ConnectionContext context, Usage usage) { if (advisoryWhenFull) { broker.isFull(context, this, usage); } } public void dispose(ConnectionContext context) throws IOException { if (this.store != null) { this.store.removeAllMessages(context); this.store.dispose(context); } this.destinationStatistics.setParent(null); this.memoryUsage.stop(); } /** * Provides a hook to allow messages with no consumer to be processed in * some way - such as to send to a dead letter queue or something.. */ protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { if (!msg.isPersistent()) { if (isSendAdvisoryIfNoConsumers()) { // allow messages with no consumers to be dispatched to a dead // letter queue if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { Message message = msg.copy(); // The original destination and transaction id do not get // filled when the message is first sent, // it is only populated if the message is routed to another // destination like the DLQ if (message.getOriginalDestination() != null) { message.setOriginalDestination(message.getDestination()); } if (message.getOriginalTransactionId() != null) { message.setOriginalTransactionId(message.getTransactionId()); } ActiveMQTopic advisoryTopic; if (destination.isQueue()) { advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); } else { advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); } message.setDestination(advisoryTopic); message.setTransactionId(null); // Disable flow control for this since since we don't want // to block. boolean originalFlowControl = context.isProducerFlowControl(); try { context.setProducerFlowControl(false); ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); producerExchange.setMutable(false); producerExchange.setConnectionContext(context); producerExchange.setProducerState(new ProducerState(new ProducerInfo())); context.getBroker().send(producerExchange, message); } finally { context.setProducerFlowControl(originalFlowControl); } } } } } public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { } public final int getStoreUsageHighWaterMark() { return this.storeUsageHighWaterMark; } public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { this.storeUsageHighWaterMark = storeUsageHighWaterMark; } protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { waitForSpace(context, usage, 100, warning); } protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { if (systemUsage.isSendFailIfNoSpace()) { getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning); throw new ResourceAllocationException(warning); } if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning); throw new ResourceAllocationException(warning); } } else { long start = System.currentTimeMillis(); long nextWarn = start; while (!usage.waitForSpace(1000, highWaterMark)) { if (context.getStopping().get()) { throw new IOException("Connection closed, send aborted."); } long now = System.currentTimeMillis(); if (now >= nextWarn) { getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); nextWarn = now + blockedProducerWarningInterval; } } } } protected abstract Logger getLog(); public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { this.slowConsumerStrategy = slowConsumerStrategy; } public SlowConsumerStrategy getSlowConsumerStrategy() { return this.slowConsumerStrategy; } public boolean isPrioritizedMessages() { return this.prioritizedMessages; } public void setPrioritizedMessages(boolean prioritizedMessages) { this.prioritizedMessages = prioritizedMessages; if (store != null) { store.setPrioritizedMessages(prioritizedMessages); } } /** * @return the inactiveTimoutBeforeGC */ public long getInactiveTimoutBeforeGC() { return this.inactiveTimoutBeforeGC; } /** * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set */ public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC; } /** * @return the gcIfInactive */ public boolean isGcIfInactive() { return this.gcIfInactive; } /** * @param gcIfInactive the gcIfInactive to set */ public void setGcIfInactive(boolean gcIfInactive) { this.gcIfInactive = gcIfInactive; } public void markForGC(long timeStamp) { if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) { this.lastActiveTime = timeStamp; } } public boolean canGC() { boolean result = false; if (isGcIfInactive()&& this.lastActiveTime != 0l) { if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) { result = true; } } return result; } public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { this.reduceMemoryFootprint = reduceMemoryFootprint; } protected boolean isReduceMemoryFootprint() { return this.reduceMemoryFootprint; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ BaseDestination.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.