alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Tomcat example source code file (NonBlockingCoordinator.java)

This example Tomcat source code file (NonBlockingCoordinator.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Tomcat tags/keywords

channelexception, channelexception, coordinationevent, coordinationmessage, coordinationmessage, election, member, member, memberimpl, memberimpl, membership, string, unable, uniqueid

The Tomcat NonBlockingCoordinator.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 */
package org.apache.catalina.tribes.group.interceptors;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.AbsoluteOrder;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.Membership;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.catalina.tribes.util.UUIDGenerator;

/**
 * <p>Title: Auto merging leader election algorithm

* * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, * it also merges groups automatically when members are discovered that werent part of the * </p> * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on * </p> * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership * to pass a token ring of the current membership.<br> * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br> * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all * nodes are receiving pings from all the other nodes. * meaning, that node{i} receives pings from node{all}-node{i}<br> * but the following could happen if a multicast problem occurs. * A has members {B,C,D}<br> * B has members {A,C}<br> * C has members {D,E}<br> * D has members {A,B,C,E}<br> * E has members {A,C,D}<br> * Because the default Tribes membership implementation, relies on the multicast packets to * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br> * <br> * To best explain how this algorithm works, lets take the above example: * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work * where messages overlap, as they all depend on absolute order<br> * Scenario 1: A,B,C,D,E all come online at the same time * Eval phase, A thinks of itself as leader, B thinks of A as leader, * C thinks of itself as leader, D,E think of A as leader<br> * Token phase:<br> * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br> * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br> * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br> * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br> * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br> * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br> * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br> * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br> * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br> * At this point, the state looks like<br> * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br> * B - {A-ldr, mbrs-A,B,C,D, id=X}<br> * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br> * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br> * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br> * <br> * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have * arrived at the same membership and all nodes are informed of each other.<br> * To synchronize the rest we simply perform the following check at A when A receives X:<br> * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br> * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B * When A receives X again, the token is complete. <br> * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then * install and accept the view. * </p> * <p> * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br> * Lets also assume that C1 sees the following view {B,D,E}<br> * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br> * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br> * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br> * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br> * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br> * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E * </p> * <p>To ensure that the view gets implemented at all nodes at the same time, * A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above. * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships

* * <p>The example above, of course can be simplified with a finite statemachine:
* But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br> * Maybe I'll do a state diagram :) * </p> * <h2>State Diagrams * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election

* <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message

* * @author Filip Hanik * @version 1.0 * * * */ public class NonBlockingCoordinator extends ChannelInterceptorBase { /** * header for a coordination message */ protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63}; /** * Coordination request */ protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30}; /** * Coordination confirmation, for blocking installations */ protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20}; /** * Alive message */ protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46, -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111, 74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26, 119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55}; /** * Time to wait for coordination timeout */ protected long waitForCoordMsgTimeout = 15000; /** * Our current view */ protected Membership view = null; /** * Out current viewId */ protected UniqueId viewId; /** * Our nonblocking membership */ protected Membership membership = null; /** * indicates that we are running an election * and this is the one we are running */ protected UniqueId suggestedviewId; protected Membership suggestedView; protected boolean started = false; protected final int startsvc = 0xFFFF; protected Object electionMutex = new Object(); protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false); public NonBlockingCoordinator() { super(); } //============================================================================================================ // COORDINATION HANDLING //============================================================================================================ public void startElection(boolean force) throws ChannelException { synchronized (electionMutex) { MemberImpl local = (MemberImpl)getLocalMember(false); MemberImpl[] others = (MemberImpl[])membership.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated")); if ( others.length == 0 ) { this.viewId = new UniqueId(UUIDGenerator.randomUUID(false)); this.view = new Membership(local,AbsoluteOrder.comp, true); this.handleViewConf(this.createElectionMsg(local,others,local),local,view); return; //the only member, no need for an election } if ( suggestedviewId != null ) { if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) { suggestedviewId = null; suggestedView = null; fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view")); } else { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running")); } return; //election already running, I'm not allowed to have two of them } if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership")); return; //already have this view installed } int prio = AbsoluteOrder.comp.compare(local,others[0]); MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view? if ( local.equals(leader) || force ) { CoordinationMessage msg = createElectionMsg(local, others, leader); suggestedviewId = msg.getId(); suggestedView = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(suggestedView,msg.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request")); sendElectionMsg(local,others[0],msg); } else { try { coordMsgReceived.set(false); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request")); electionMutex.wait(waitForCoordMsgTimeout); }catch ( InterruptedException x ) { Thread.currentThread().interrupted(); } if ( suggestedviewId == null && (!coordMsgReceived.get())) { //no message arrived, send the coord msg // fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out.")); // startElection(true); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out.")); } else { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message")); } }//end if } } private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) { Membership m = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(m,others); MemberImpl[] mbrs = m.getMembers(); m.reset(); CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST); return msg; } protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")")); super.sendMessage(new Member[] {next}, createData(msg, local), null); } protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; boolean sent = false; while ( !sent && current >= 0 ) { try { sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg); sent = true; }catch ( ChannelException x ) { log.warn("Unable to send election message to:"+msg.getMembers()[current]); current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers()); if ( current == next ) throw x; } } } public Member getNextInLine(MemberImpl local, MemberImpl[] others) { MemberImpl result = null; for ( int i=0; i<others.length; i++ ) { } return result; } public ChannelData createData(CoordinationMessage msg, MemberImpl local) { msg.write(); ChannelData data = new ChannelData(true); data.setAddress(local); data.setMessage(msg.getBuffer()); data.setOptions(Channel.SEND_OPTIONS_USE_ACK); data.setTimestamp(System.currentTimeMillis()); return data; } protected void viewChange(UniqueId viewId, Member[] view) { //invoke any listeners } protected boolean alive(Member mbr) { return TcpFailureDetector.memberAlive(mbr, COORD_ALIVE, false, false, waitForCoordMsgTimeout, waitForCoordMsgTimeout, getOptionFlag()); } protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge")); MemberImpl local = (MemberImpl)getLocalMember(false); Membership merged = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(merged,msg.getMembers()); Arrays.fill(merged,getMembers()); Member[] diff = Arrays.diff(merged,membership,local); for ( int i=0; i<diff.length; i++ ) { if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); else memberAdded(diff[i],false); } fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge")); return merged; } protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { if ( !coordMsgReceived.get() ) { coordMsgReceived.set(true); synchronized (electionMutex) { electionMutex.notifyAll();} } msg.timestamp = System.currentTimeMillis(); Membership merged = mergeOnArrive(msg, sender); if (isViewConf(msg)) handleViewConf(msg, sender, merged); else handleToken(msg, sender, merged); ClassLoader loader; } protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { MemberImpl local = (MemberImpl)getLocalMember(false); if ( local.equals(msg.getSource()) ) { //my message msg.src=local handleMyToken(local, msg, sender,merged); } else { handleOtherToken(local, msg, sender,merged); } } protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //no leadership change if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null); handleViewConf(msg,local,merged); } else { //membership change suggestedView = new Membership(local,AbsoluteOrder.comp,true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView,merged.getMembers()); msg.view = (MemberImpl[])merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } else { //leadership change suggestedView = null; suggestedviewId = null; msg.view = (MemberImpl[])merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //I am the new leader //startElection(false); } else { msg.view = (MemberImpl[])merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); Arrays.fill(view,msg.getMembers()); viewId = msg.getId(); if ( viewId.equals(suggestedviewId) ) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { suggestedView = null; suggestedviewId = null; } viewChange(viewId,view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); } } protected boolean isViewConf(CoordinationMessage msg) { return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length); } protected boolean hasHigherPriority(Member[] complete, Member[] local) { if ( local == null || local.length == 0 ) return false; if ( complete == null || complete.length == 0 ) return true; AbsoluteOrder.absoluteOrder(complete); AbsoluteOrder.absoluteOrder(local); return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0); } /** * Returns coordinator if one is available * @return Member */ public Member getCoordinator() { return (view != null && view.hasMembers()) ? view.getMembers()[0] : null; } public Member[] getView() { return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0]; } public UniqueId getViewId() { return viewId; } /** * Block in/out messages while a election is going on */ protected void halt() { } /** * Release lock for in/out messages election is completed */ protected void release() { } /** * Wait for an election to end */ protected void waitForRelease() { } //============================================================================================================ // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE //============================================================================================================ public void start(int svc) throws ChannelException { if (membership == null) setupMembership(); if (started)return; fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start")); super.start(startsvc); started = true; if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start")); startElection(false); } public void stop(int svc) throws ChannelException { try { halt(); synchronized (electionMutex) { if (!started)return; started = false; fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop")); super.stop(startsvc); this.view = null; this.viewId = null; this.suggestedView = null; this.suggestedviewId = null; this.membership.reset(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop")); } }finally { release(); } } public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { waitForRelease(); super.sendMessage(destination, msg, payload); } public void messageReceived(ChannelMessage msg) { if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { //ignore message, its an alive message fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message")); } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { try { CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); Member[] cmbr = cmsg.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")")); processCoordMessage(cmsg, msg.getAddress()); }catch ( ChannelException x ) { log.error("Error processing coordination message. Could be fatal.",x); } } else { super.messageReceived(msg); } } public boolean accept(ChannelMessage msg) { return super.accept(msg); } public void memberAdded(Member member) { memberAdded(member,true); } public void memberAdded(Member member,boolean elect) { try { if ( membership == null ) setupMembership(); if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")")); if (started && elect) startElection(false); }catch ( ChannelException x ) { log.error("Unable to start election when member was added.",x); } }finally { } } public void memberDisappeared(Member member) { try { membership.removeMember((MemberImpl)member); super.memberDisappeared(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")")); if ( started && (isCoordinator() || isHighest()) ) startElection(true); //to do, if a member disappears, only the coordinator can start }catch ( ChannelException x ) { log.error("Unable to start election when member was removed.",x); } }finally { } } public boolean isHighest() { Member local = getLocalMember(false); if ( membership.getMembers().length == 0 ) return true; else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0; } public boolean isCoordinator() { Member coord = getCoordinator(); return coord != null && getLocalMember(false).equals(coord); } public void heartbeat() { try { MemberImpl local = (MemberImpl)getLocalMember(false); if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) { if ( isHighest() ) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, "Heartbeat found inconsistency, restart election")); startElection(true); } } } catch ( Exception x ){ log.error("Unable to perform heartbeat.",x); } finally { super.heartbeat(); } } /** * has members */ public boolean hasMembers() { return membership.hasMembers(); } /** * Get all current cluster members * @return all members or empty array */ public Member[] getMembers() { return membership.getMembers(); } /** * * @param mbr Member * @return Member */ public Member getMember(Member mbr) { return membership.getMember(mbr); } /** * Return the member that represents this node. * * @return Member */ public Member getLocalMember(boolean incAlive) { Member local = super.getLocalMember(incAlive); if ( view == null && (local != null)) setupMembership(); return local; } protected synchronized void setupMembership() { if ( membership == null ) { membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false); } } //============================================================================================================ // HELPER CLASSES FOR COORDINATION //============================================================================================================ public static class CoordinationMessage { //X{A-ldr, A-src, mbrs-A,B,C,D} protected XByteBuffer buf; protected MemberImpl leader; protected MemberImpl source; protected MemberImpl[] view; protected UniqueId id; protected byte[] type; protected long timestamp = System.currentTimeMillis(); public CoordinationMessage(XByteBuffer buf) { this.buf = buf; parse(); } public CoordinationMessage(MemberImpl leader, MemberImpl source, MemberImpl[] view, UniqueId id, byte[] type) { this.buf = new XByteBuffer(4096,false); this.leader = leader; this.source = source; this.view = view; this.id = id; this.type = type; this.write(); } public byte[] getHeader() { return NonBlockingCoordinator.COORD_HEADER; } public MemberImpl getLeader() { if ( leader == null ) parse(); return leader; } public MemberImpl getSource() { if ( source == null ) parse(); return source; } public UniqueId getId() { if ( id == null ) parse(); return id; } public MemberImpl[] getMembers() { if ( view == null ) parse(); return view; } public byte[] getType() { if (type == null ) parse(); return type; } public XByteBuffer getBuffer() { return this.buf; } public void parse() { //header int offset = 16; //leader int ldrLen = buf.toInt(buf.getBytesDirect(),offset); offset += 4; byte[] ldr = new byte[ldrLen]; System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen); leader = MemberImpl.getMember(ldr); offset += ldrLen; //source int srcLen = buf.toInt(buf.getBytesDirect(),offset); offset += 4; byte[] src = new byte[srcLen]; System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen); source = MemberImpl.getMember(src); offset += srcLen; //view int mbrCount = buf.toInt(buf.getBytesDirect(),offset); offset += 4; view = new MemberImpl[mbrCount]; for (int i=0; i<view.length; i++ ) { int mbrLen = buf.toInt(buf.getBytesDirect(),offset); offset += 4; byte[] mbr = new byte[mbrLen]; System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen); view[i] = MemberImpl.getMember(mbr); offset += mbrLen; } //id this.id = new UniqueId(buf.getBytesDirect(),offset,16); offset += 16; type = new byte[16]; System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length); offset += 16; } public void write() { buf.reset(); //header buf.append(COORD_HEADER,0,COORD_HEADER.length); //leader byte[] ldr = leader.getData(false,false); buf.append(ldr.length); buf.append(ldr,0,ldr.length); ldr = null; //source byte[] src = source.getData(false,false); buf.append(src.length); buf.append(src,0,src.length); src = null; //view buf.append(view.length); for (int i=0; i<view.length; i++ ) { byte[] mbr = view[i].getData(false,false); buf.append(mbr.length); buf.append(mbr,0,mbr.length); } //id buf.append(id.getBytes(),0,id.getBytes().length); buf.append(type,0,type.length); } } public void fireInterceptorEvent(InterceptorEvent event) { if (event instanceof CoordinationEvent && ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) log.info(event); } public static class CoordinationEvent implements InterceptorEvent { public static final int EVT_START = 1; public static final int EVT_MBR_ADD = 2; public static final int EVT_MBR_DEL = 3; public static final int EVT_START_ELECT = 4; public static final int EVT_PROCESS_ELECT = 5; public static final int EVT_MSG_ARRIVE = 6; public static final int EVT_PRE_MERGE = 7; public static final int EVT_POST_MERGE = 8; public static final int EVT_WAIT_FOR_MSG = 9; public static final int EVT_SEND_MSG = 10; public static final int EVT_STOP = 11; public static final int EVT_CONF_RX = 12; public static final int EVT_ELECT_ABANDONED = 13; int type; ChannelInterceptor interceptor; Member coord; Member[] mbrs; String info; Membership view; Membership suggestedView; public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) { this.type = type; this.interceptor = interceptor; this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator(); this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers(); this.info = info; this.view = ((NonBlockingCoordinator)interceptor).view; this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView; } public int getEventType() { return type; } public String getEventTypeDesc() { switch (type) { case EVT_START: return "EVT_START:"+info; case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info; case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info; case EVT_START_ELECT: return "EVT_START_ELECT:"+info; case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info; case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info; case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info; case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info; case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info; case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info; case EVT_STOP: return "EVT_STOP:"+info; case EVT_CONF_RX: return "EVT_CONF_RX:"+info; case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info; default: return "Unknown"; } } public ChannelInterceptor getInterceptor() { return interceptor; } public String toString() { StringBuffer buf = new StringBuffer("CoordinationEvent[type="); buf.append(type).append("\n\tLocal:"); Member local = interceptor.getLocalMember(false); buf.append(local!=null?local.getName():"").append("\n\tCoord:"); buf.append(coord!=null?coord.getName():"").append("\n\tView:"); buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:"); buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:"); buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:"); buf.append(info).append("]"); return buf.toString(); } } }

Other Tomcat examples (source code examples)

Here is a short list of links related to this Tomcat NonBlockingCoordinator.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.