alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (AbstractPendingMessageCursor.java)

This example ActiveMQ source code file (AbstractPendingMessageCursor.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

abstractpendingmessagecursor, abstractpendingmessagecursor, activemqmessageaudit, activemqmessageaudit, destination, destination, exception, exception, list, messagereference, not, pendingmessagecursor, runtimeexception, systemusage, util

The ActiveMQ AbstractPendingMessageCursor.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.region.cursors;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;

/**
 * Abstract method holder for pending message (messages awaiting disptach to a
 * consumer) cursor
 * 
 * 
 */
public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
    protected int memoryUsageHighWaterMark = 70;
    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
    protected SystemUsage systemUsage;
    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
    protected boolean enableAudit=true;
    protected ActiveMQMessageAudit audit;
    protected boolean useCache=true;
    private boolean cacheEnabled=true;
    private boolean started=false;
    protected MessageReference last = null;
    protected final boolean prioritizedMessages;
    
    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
        this.prioritizedMessages=prioritizedMessages;
    }
  

    public synchronized void start() throws Exception  {
        if (!started && enableAudit && audit==null) {
            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
        }
        started=true;
    }

    public synchronized void stop() throws Exception  {
        started=false;
        audit=null;
        gc();
    }

    public void add(ConnectionContext context, Destination destination) throws Exception {
    }

    @SuppressWarnings("unchecked")
    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
        return Collections.EMPTY_LIST;
    }

    public boolean isRecoveryRequired() {
        return true;
    }

    public void addMessageFirst(MessageReference node) throws Exception {
    }

    public void addMessageLast(MessageReference node) throws Exception {
    }
    
    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
        addMessageLast(node);
        return true;
    }

    public void addRecoveredMessage(MessageReference node) throws Exception {
        addMessageLast(node);
    }

    public void clear() {
    }

    public boolean hasNext() {
        return false;
    }

    public boolean isEmpty() {
        return false;
    }

    public boolean isEmpty(Destination destination) {
        return isEmpty();
    }

    public MessageReference next() {
        return null;
    }

    public void remove() {
    }

    public void reset() {
    }

    public int size() {
        return 0;
    }

    public int getMaxBatchSize() {
        return maxBatchSize;
    }

    public void setMaxBatchSize(int maxBatchSize) {
        this.maxBatchSize = maxBatchSize;
    }

    protected void fillBatch() throws Exception {
    }

    public void resetForGC() {
        reset();
    }

    public void remove(MessageReference node) {
    }

    public void gc() {
    }

    public void setSystemUsage(SystemUsage usageManager) {
        this.systemUsage = usageManager;
    }

    public boolean hasSpace() {
        return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
    }

    public boolean isFull() {
        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
    }

    public void release() {
    }

    public boolean hasMessagesBufferedToDeliver() {
        return false;
    }

    /**
     * @return the memoryUsageHighWaterMark
     */
    public int getMemoryUsageHighWaterMark() {
        return memoryUsageHighWaterMark;
    }

    /**
     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
     */
    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
    }

    /**
     * @return the usageManager
     */
    public SystemUsage getSystemUsage() {
        return this.systemUsage;
    }

    /**
     * destroy the cursor
     * 
     * @throws Exception
     */
    public void destroy() throws Exception {
        stop();
    }

    /**
     * Page in a restricted number of messages
     * 
     * @param maxItems maximum number of messages to return
     * @return a list of paged in messages
     */
    public LinkedList<MessageReference> pageInList(int maxItems) {
        throw new RuntimeException("Not supported");
    }

    /**
     * @return the maxProducersToAudit
     */
    public int getMaxProducersToAudit() {
        return maxProducersToAudit;
    }

    /**
     * @param maxProducersToAudit the maxProducersToAudit to set
     */
    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
        this.maxProducersToAudit = maxProducersToAudit;
        if (audit != null) {
            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
        }
    }

    /**
     * @return the maxAuditDepth
     */
    public int getMaxAuditDepth() {
        return maxAuditDepth;
    }
    

    /**
     * @param maxAuditDepth the maxAuditDepth to set
     */
    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
        this.maxAuditDepth = maxAuditDepth;
        if (audit != null) {
            audit.setAuditDepth(maxAuditDepth);
        }
    }
    
    
    /**
     * @return the enableAudit
     */
    public boolean isEnableAudit() {
        return enableAudit;
    }

    /**
     * @param enableAudit the enableAudit to set
     */
    public synchronized void setEnableAudit(boolean enableAudit) {
        this.enableAudit = enableAudit;
        if (enableAudit && started && audit==null) {
            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
        }
    }
    
    public boolean isTransient() {
        return false;
    }
    
       
    /**
     * set the audit
     * @param audit new audit component
     */
    public void setMessageAudit(ActiveMQMessageAudit audit) {
    	this.audit=audit;
    }
    
    
    /**
     * @return the audit
     */
    public ActiveMQMessageAudit getMessageAudit() {
    	return audit;
    }
    
    public boolean isUseCache() {
        return useCache;
    }

    public void setUseCache(boolean useCache) {
        this.useCache = useCache;
    }

    public synchronized boolean isDuplicate(MessageId messageId) {
        boolean unique = recordUniqueId(messageId);
        rollback(messageId);
        return !unique;
    }
    
    /**
     * records a message id and checks if it is a duplicate
     * @param messageId
     * @return true if id is unique, false otherwise.
     */
    public synchronized boolean recordUniqueId(MessageId messageId) {
        if (!enableAudit || audit==null) {
            return true;
        }
        return !audit.isDuplicate(messageId);
    }
    
    public synchronized void rollback(MessageId id) {
        if (audit != null) {
            audit.rollback(id);
        }
    }
    
    protected synchronized boolean isStarted() {
        return started;
    }
    
    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
        boolean result = false;
        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
        if (destinations != null) {
            for (Destination dest:destinations) {
                if (dest.isPrioritizedMessages()) {
                    result = true;
                    break;
                }
            }
        }
        return result;

    }

    public synchronized boolean isCacheEnabled() {
        return cacheEnabled;
    }

    public synchronized void setCacheEnabled(boolean val) {
        cacheEnabled = val;
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ AbstractPendingMessageCursor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.