alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Tomcat example source code file (OrderInterceptor.java)

This example Tomcat source code file (OrderInterceptor.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Tomcat tags/keywords

atomicinteger, channelexception, channelinterceptorbase, channelmessage, counter, counter, hashmap, hashmap, member, message, messageorder, messageorder, reentrantreadwritelock, reentrantreadwritelock, util

The Tomcat OrderInterceptor.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 */

package org.apache.catalina.tribes.group.interceptors;

import java.util.HashMap;

import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.XByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;



/**
 *
 * The order interceptor guarantees that messages are received in the same order they were 
 * sent.
 * This interceptor works best with the ack=true setting. <br>
 * There is no point in 
 * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
 * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
 * this interceptor can really slow you down, as many messages will be completely out of order
 * and the queue might become rather large. If this is the case, then you might want to set 
 * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
 * <br>Configuration Options
* OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it default=3000ms
* OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering. * This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE
* OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to * do when a message has expired or the queue has grown larger than the maxQueue value. * true means that the message is sent up the stack to the receiver that will receive and out of order message * false means, forget the message and reset the message counter. <b>default=true * * * @author Filip Hanik * @version 1.1 */ public class OrderInterceptor extends ChannelInterceptorBase { private HashMap outcounter = new HashMap(); private HashMap incounter = new HashMap(); private HashMap incoming = new HashMap(); private long expire = 3000; private boolean forwardExpired = true; private int maxQueue = Integer.MAX_VALUE; ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true); ReentrantReadWriteLock outLock= new ReentrantReadWriteLock(true); public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( !okToProcess(msg.getOptions()) ) { super.sendMessage(destination, msg, payload); return; } ChannelException cx = null; for (int i=0; i<destination.length; i++ ) { try { int nr = 0; try { outLock.writeLock().lock(); nr = incCounter(destination[i]); } finally { outLock.writeLock().unlock(); } //reduce byte copy msg.getMessage().append(nr); try { getNext().sendMessage(new Member[] {destination[i]}, msg, payload); } finally { msg.getMessage().trim(4); } }catch ( ChannelException x ) { if ( cx == null ) cx = x; cx.addFaultyMember(x.getFaultyMembers()); } }//for if ( cx != null ) throw cx; } public void messageReceived(ChannelMessage msg) { if ( !okToProcess(msg.getOptions()) ) { super.messageReceived(msg); return; } int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); msg.getMessage().trim(4); MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone()); try { inLock.writeLock().lock(); if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); }finally { inLock.writeLock().unlock(); } } protected void processLeftOvers(Member member, boolean force) { MessageOrder tmp = (MessageOrder)incoming.get(member); if ( force ) { Counter cnt = getInCounter(member); cnt.setCounter(Integer.MAX_VALUE); } if ( tmp!= null ) processIncoming(tmp); } /** * * @param order MessageOrder * @return boolean - true if a message expired and was processed */ protected boolean processIncoming(MessageOrder order) { boolean result = false; Member member = order.getMessage().getAddress(); Counter cnt = getInCounter(member); MessageOrder tmp = (MessageOrder)incoming.get(member); if ( tmp != null ) { order = MessageOrder.add(tmp,order); } while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) { //we are right on target. process orders if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc(); else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr()); super.messageReceived(order.getMessage()); order.setMessage(null); order = order.next; } MessageOrder head = order; MessageOrder prev = null; tmp = order; //flag to empty out the queue when it larger than maxQueue boolean empty = order!=null?order.getCount()>=maxQueue:false; while ( tmp != null ) { //process expired messages or empty out the queue if ( tmp.isExpired(expire) || empty ) { //reset the head if ( tmp == head ) head = tmp.next; cnt.setCounter(tmp.getMsgNr()+1); if ( getForwardExpired() ) super.messageReceived(tmp.getMessage()); tmp.setMessage(null); tmp = tmp.next; if ( prev != null ) prev.next = tmp; result = true; } else { prev = tmp; tmp = tmp.next; } } if ( head == null ) incoming.remove(member); else incoming.put(member, head); return result; } public void memberAdded(Member member) { //notify upwards super.memberAdded(member); } public void memberDisappeared(Member member) { //reset counters - lock free incounter.remove(member); outcounter.remove(member); //clear the remaining queue processLeftOvers(member,true); //notify upwards super.memberDisappeared(member); } protected int incCounter(Member mbr) { Counter cnt = getOutCounter(mbr); return cnt.inc(); } protected Counter getInCounter(Member mbr) { Counter cnt = (Counter)incounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); cnt.inc(); //always start at 1 for incoming incounter.put(mbr,cnt); } return cnt; } protected Counter getOutCounter(Member mbr) { Counter cnt = (Counter)outcounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); outcounter.put(mbr,cnt); } return cnt; } protected static class Counter { private AtomicInteger value = new AtomicInteger(0); public int getCounter() { return value.get(); } public void setCounter(int counter) { this.value.set(counter); } public int inc() { return value.addAndGet(1); } } protected static class MessageOrder { private long received = System.currentTimeMillis(); private MessageOrder next; private int msgNr; private ChannelMessage msg = null; public MessageOrder(int msgNr,ChannelMessage msg) { this.msgNr = msgNr; this.msg = msg; } public boolean isExpired(long expireTime) { return (System.currentTimeMillis()-received) > expireTime; } public ChannelMessage getMessage() { return msg; } public void setMessage(ChannelMessage msg) { this.msg = msg; } public void setNext(MessageOrder order) { this.next = order; } public MessageOrder getNext() { return next; } public int getCount() { int counter = 1; MessageOrder tmp = next; while ( tmp != null ) { counter++; tmp = tmp.next; } return counter; } public static MessageOrder add(MessageOrder head, MessageOrder add) { if ( head == null ) return add; if ( add == null ) return head; if ( head == add ) return add; if ( head.getMsgNr() > add.getMsgNr() ) { add.next = head; return add; } MessageOrder iter = head; MessageOrder prev = null; while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) { prev = iter; iter = iter.next; } if ( iter.getMsgNr() < add.getMsgNr() ) { //add after add.next = iter.next; iter.next = add; } else if (iter.getMsgNr() > add.getMsgNr()) { //add before prev.next = add; add.next = iter; } else { throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor"); } return head; } public int getMsgNr() { return msgNr; } } public void setExpire(long expire) { this.expire = expire; } public void setForwardExpired(boolean forwardExpired) { this.forwardExpired = forwardExpired; } public void setMaxQueue(int maxQueue) { this.maxQueue = maxQueue; } public long getExpire() { return expire; } public boolean getForwardExpired() { return forwardExpired; } public int getMaxQueue() { return maxQueue; } }

Other Tomcat examples (source code examples)

Here is a short list of links related to this Tomcat OrderInterceptor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.