alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Tomcat example source code file (DeltaManager.java)

This example Tomcat source code file (DeltaManager.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Tomcat tags/keywords

bean, catalinacluster, context, deltamanager, deltasession, deltasession, io, ioexception, ioexception, javabean, member, member, replicationvalve, session, sessionmessage, sessionmessage, string, util

The Tomcat DeltaManager.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.catalina.ha.session;

import java.beans.PropertyChangeEvent;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;

import org.apache.catalina.Cluster;
import org.apache.catalina.Container;
import org.apache.catalina.Context;
import org.apache.catalina.Engine;
import org.apache.catalina.Host;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
import org.apache.catalina.Session;
import org.apache.catalina.Valve;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.ha.CatalinaCluster;
import org.apache.catalina.ha.ClusterMessage;
import org.apache.catalina.ha.tcp.ReplicationValve;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.ReplicationStream;
import org.apache.catalina.util.LifecycleSupport;
import org.apache.catalina.util.StringManager;
import org.apache.catalina.ha.ClusterManager;

/**
 * The DeltaManager manages replicated sessions by only replicating the deltas
 * in data. For applications written to handle this, the DeltaManager is the
 * optimal way of replicating data.
 * 
 * This code is almost identical to StandardManager with a difference in how it
 * persists sessions and some modifications to it.
 * 
 * <b>IMPLEMENTATION NOTE : Correct behavior of session storing and
 * reloading depends upon external calls to the <code>start() and
 * <code>stop() methods of this class at the correct times.
 * 
 * @author Filip Hanik
 * @author Craig R. McClanahan
 * @author Jean-Francois Arcand
 * @author Peter Rossbach
 * @version $Revision: 561326 $ $Date: 2007-07-31 15:35:35 +0200 (mar., 31 juil. 2007) $
 */

public class DeltaManager extends ClusterManagerBase{

    // ---------------------------------------------------- Security Classes
    public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);

    /**
     * The string manager for this package.
     */
    protected static StringManager sm = StringManager.getManager(Constants.Package);

    // ----------------------------------------------------- Instance Variables

    /**
     * The descriptive information about this implementation.
     */
    private static final String info = "DeltaManager/2.1";

    /**
     * Has this component been started yet?
     */
    private boolean started = false;

    /**
     * The descriptive name of this Manager implementation (for logging).
     */
    protected static String managerName = "DeltaManager";
    protected String name = null;
    protected boolean defaultMode = false;
    private CatalinaCluster cluster = null;

    /**
     * cached replication valve cluster container!
     */
    private ReplicationValve replicationValve = null ;
    
    /**
     * The lifecycle event support for this component.
     */
    protected LifecycleSupport lifecycle = new LifecycleSupport(this);

    /**
     * The maximum number of active Sessions allowed, or -1 for no limit.
     */
    private int maxActiveSessions = -1;
    private boolean expireSessionsOnShutdown = false;
    private boolean notifyListenersOnReplication = true;
    private boolean notifySessionListenersOnReplication = true;
    private boolean stateTransfered = false ;
    private int stateTransferTimeout = 60;
    private boolean sendAllSessions = true;
    private boolean sendClusterDomainOnly = true ;
    private int sendAllSessionsSize = 1000 ;
    
    /**
     * wait time between send session block (default 2 sec) 
     */
    private int sendAllSessionsWaitTime = 2 * 1000 ; 
    private ArrayList receivedMessageQueue = new ArrayList() ;
    private boolean receiverQueue = false ;
    private boolean stateTimestampDrop = true ;
    private long stateTransferCreateSendTime; 
    
    // ------------------------------------------------------------------ stats attributes
    
    int rejectedSessions = 0;
    private long sessionReplaceCounter = 0 ;
    long processingTime = 0;
    private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
    private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
    private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
    private long counterReceive_EVT_SESSION_CREATED = 0 ;
    private long counterReceive_EVT_SESSION_EXPIRED = 0;
    private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
    private long counterReceive_EVT_SESSION_DELTA = 0;
    private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
    private long counterSend_EVT_SESSION_CREATED = 0;
    private long counterSend_EVT_SESSION_DELTA = 0 ;
    private long counterSend_EVT_SESSION_ACCESSED = 0;
    private long counterSend_EVT_SESSION_EXPIRED = 0;
    private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
    private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
    private int counterNoStateTransfered = 0 ;


    // ------------------------------------------------------------- Constructor
    public DeltaManager() {
        super();
    }

    // ------------------------------------------------------------- Properties
    
    /**
     * Return descriptive information about this Manager implementation and the
     * corresponding version number, in the format
     * <code><description>/<version>.
     */
    public String getInfo() {
        return info;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * Return the descriptive short name of this Manager implementation.
     */
    public String getName() {
        return name;
    }

    /**
     * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
     */
    public long getCounterSend_EVT_GET_ALL_SESSIONS() {
        return counterSend_EVT_GET_ALL_SESSIONS;
    }
    
    /**
     * @return Returns the counterSend_EVT_SESSION_ACCESSED.
     */
    public long getCounterSend_EVT_SESSION_ACCESSED() {
        return counterSend_EVT_SESSION_ACCESSED;
    }
    
    /**
     * @return Returns the counterSend_EVT_SESSION_CREATED.
     */
    public long getCounterSend_EVT_SESSION_CREATED() {
        return counterSend_EVT_SESSION_CREATED;
    }

    /**
     * @return Returns the counterSend_EVT_SESSION_DELTA.
     */
    public long getCounterSend_EVT_SESSION_DELTA() {
        return counterSend_EVT_SESSION_DELTA;
    }

    /**
     * @return Returns the counterSend_EVT_SESSION_EXPIRED.
     */
    public long getCounterSend_EVT_SESSION_EXPIRED() {
        return counterSend_EVT_SESSION_EXPIRED;
    }
 
    /**
     * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
     */
    public long getCounterSend_EVT_ALL_SESSION_DATA() {
        return counterSend_EVT_ALL_SESSION_DATA;
    }

    /**
     * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
     */
    public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
        return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
    }
 
    /**
     * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
     */
    public long getCounterReceive_EVT_ALL_SESSION_DATA() {
        return counterReceive_EVT_ALL_SESSION_DATA;
    }
    
    /**
     * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
     */
    public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
        return counterReceive_EVT_GET_ALL_SESSIONS;
    }
    
    /**
     * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
     */
    public long getCounterReceive_EVT_SESSION_ACCESSED() {
        return counterReceive_EVT_SESSION_ACCESSED;
    }
    
    /**
     * @return Returns the counterReceive_EVT_SESSION_CREATED.
     */
    public long getCounterReceive_EVT_SESSION_CREATED() {
        return counterReceive_EVT_SESSION_CREATED;
    }
    
    /**
     * @return Returns the counterReceive_EVT_SESSION_DELTA.
     */
    public long getCounterReceive_EVT_SESSION_DELTA() {
        return counterReceive_EVT_SESSION_DELTA;
    }
    
    /**
     * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
     */
    public long getCounterReceive_EVT_SESSION_EXPIRED() {
        return counterReceive_EVT_SESSION_EXPIRED;
    }
    
    
    /**
     * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
     */
    public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
        return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
    }
    
    /**
     * @return Returns the processingTime.
     */
    public long getProcessingTime() {
        return processingTime;
    }
 
    /**
     * @return Returns the sessionReplaceCounter.
     */
    public long getSessionReplaceCounter() {
        return sessionReplaceCounter;
    }
    
    /**
     * Number of session creations that failed due to maxActiveSessions
     * 
     * @return The count
     */
    public int getRejectedSessions() {
        return rejectedSessions;
    }

    public void setRejectedSessions(int rejectedSessions) {
        this.rejectedSessions = rejectedSessions;
    }

    /**
     * @return Returns the counterNoStateTransfered.
     */
    public int getCounterNoStateTransfered() {
        return counterNoStateTransfered;
    }
    
    public int getReceivedQueueSize() {
        return receivedMessageQueue.size() ;
    }
    
    /**
     * @return Returns the stateTransferTimeout.
     */
    public int getStateTransferTimeout() {
        return stateTransferTimeout;
    }
    /**
     * @param timeoutAllSession The timeout
     */
    public void setStateTransferTimeout(int timeoutAllSession) {
        this.stateTransferTimeout = timeoutAllSession;
    }

    /**
     * is session state transfered complete?
     * 
     */
    public boolean getStateTransfered() {
        return stateTransfered;
    }

    /**
     * set that state ist complete transfered  
     * @param stateTransfered
     */
    public void setStateTransfered(boolean stateTransfered) {
        this.stateTransfered = stateTransfered;
    }
    
    /**
     * @return Returns the sendAllSessionsWaitTime in msec
     */
    public int getSendAllSessionsWaitTime() {
        return sendAllSessionsWaitTime;
    }
    
    /**
     * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
     */
    public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
        this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
    }
    
    /**
     * @return Returns the sendClusterDomainOnly.
     */
    public boolean doDomainReplication() {
        return sendClusterDomainOnly;
    }
    
    /**
     * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
     */
    public void setDomainReplication(boolean sendClusterDomainOnly) {
        this.sendClusterDomainOnly = sendClusterDomainOnly;
    }

    /**
     * @return Returns the stateTimestampDrop.
     */
    public boolean isStateTimestampDrop() {
        return stateTimestampDrop;
    }
    
    /**
     * @param isTimestampDrop The new flag value
     */
    public void setStateTimestampDrop(boolean isTimestampDrop) {
        this.stateTimestampDrop = isTimestampDrop;
    }
    
    /**
     * Return the maximum number of active Sessions allowed, or -1 for no limit.
     */
    public int getMaxActiveSessions() {
        return (this.maxActiveSessions);
    }

    /**
     * Set the maximum number of actives Sessions allowed, or -1 for no limit.
     * 
     * @param max
     *            The new maximum number of sessions
     */
    public void setMaxActiveSessions(int max) {
        int oldMaxActiveSessions = this.maxActiveSessions;
        this.maxActiveSessions = max;
        support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
    }
    
    /**
     * 
     * @return Returns the sendAllSessions.
     */
    public boolean isSendAllSessions() {
        return sendAllSessions;
    }
    
    /**
     * @param sendAllSessions The sendAllSessions to set.
     */
    public void setSendAllSessions(boolean sendAllSessions) {
        this.sendAllSessions = sendAllSessions;
    }
    
    /**
     * @return Returns the sendAllSessionsSize.
     */
    public int getSendAllSessionsSize() {
        return sendAllSessionsSize;
    }
    
    /**
     * @param sendAllSessionsSize The sendAllSessionsSize to set.
     */
    public void setSendAllSessionsSize(int sendAllSessionsSize) {
        this.sendAllSessionsSize = sendAllSessionsSize;
    }
    
    /**
     * @return Returns the notifySessionListenersOnReplication.
     */
    public boolean isNotifySessionListenersOnReplication() {
        return notifySessionListenersOnReplication;
    }
    
    /**
     * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
     */
    public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) {
        this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
    }
    
    
    public boolean isExpireSessionsOnShutdown() {
        return expireSessionsOnShutdown;
    }

    public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
        this.expireSessionsOnShutdown = expireSessionsOnShutdown;
    }
    
    public boolean isNotifyListenersOnReplication() {
        return notifyListenersOnReplication;
    }

    public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
        this.notifyListenersOnReplication = notifyListenersOnReplication;
    }

    
    /**
     * @return Returns the defaultMode.
     */
    public boolean isDefaultMode() {
        return defaultMode;
    }
    /**
     * @param defaultMode The defaultMode to set.
     */
    public void setDefaultMode(boolean defaultMode) {
        this.defaultMode = defaultMode;
    }
    
    public CatalinaCluster getCluster() {
        return cluster;
    }

    public void setCluster(CatalinaCluster cluster) {
        this.cluster = cluster;
    }

    /**
     * Set the Container with which this Manager has been associated. If it is a
     * Context (the usual case), listen for changes to the session timeout
     * property.
     * 
     * @param container
     *            The associated Container
     */
    public void setContainer(Container container) {
        // De-register from the old Container (if any)
        if ((this.container != null) && (this.container instanceof Context))
            ((Context) this.container).removePropertyChangeListener(this);

        // Default processing provided by our superclass
        super.setContainer(container);

        // Register with the new Container (if any)
        if ((this.container != null) && (this.container instanceof Context)) {
            setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
            ((Context) this.container).addPropertyChangeListener(this);
        }

    }
    
    // --------------------------------------------------------- Public Methods

    /**
     * Construct and return a new session object, based on the default settings
     * specified by this Manager's properties. The session id will be assigned
     * by this method, and available via the getId() method of the returned
     * session. If a new session cannot be created for any reason, return
     * <code>null.
     * 
     * @exception IllegalStateException
     *                if a new session cannot be instantiated for any reason
     * 
     * Construct and return a new session object, based on the default settings
     * specified by this Manager's properties. The session id will be assigned
     * by this method, and available via the getId() method of the returned
     * session. If a new session cannot be created for any reason, return
     * <code>null.
     * 
     * @exception IllegalStateException
     *                if a new session cannot be instantiated for any reason
     */
    public Session createSession(String sessionId) {
        return createSession(sessionId, true);
    }

    /**
     * create new session with check maxActiveSessions and send session creation
     * to other cluster nodes.
     * 
     * @param distribute
     * @return The session
     */
    public Session createSession(String sessionId, boolean distribute) {
        if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
            rejectedSessions++;
            throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
        }
        DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
        if (distribute) {
            sendCreateSession(session.getId(), session);
        }
        if (log.isDebugEnabled())
            log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
        return (session);

    }

    /**
     * Send create session evt to all backup node
     * @param sessionId
     * @param session
     */
    protected void sendCreateSession(String sessionId, DeltaSession session) {
        if(cluster.getMembers().length > 0 ) {
            SessionMessage msg = 
                new SessionMessageImpl(getName(),
                                       SessionMessage.EVT_SESSION_CREATED, 
                                       null, 
                                       sessionId,
                                       sessionId + "-" + System.currentTimeMillis());
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
            msg.setTimestamp(session.getCreationTime());
            counterSend_EVT_SESSION_CREATED++;
            send(msg);
        }
    }
    
    /**
     * Send messages to other backup member (domain or all)
     * @param msg Session message
     */
    protected void send(SessionMessage msg) {
        if(cluster != null) {
            if(doDomainReplication())
                cluster.sendClusterDomain(msg);
            else
                cluster.send(msg);
        }
    }

    /**
     * Create DeltaSession
     * @see org.apache.catalina.Manager#createEmptySession()
     */
    public Session createEmptySession() {
        return getNewDeltaSession() ;
    }
    
    /**
     * Get new session class to be used in the doLoad() method.
     */
    protected DeltaSession getNewDeltaSession() {
        return new DeltaSession(this);
    }

    /**
     * Load Deltarequest from external node
     * Load the Class at container classloader
     * @see DeltaRequest#readExternal(java.io.ObjectInput)
     * @param session
     * @param data message data
     * @return The request
     * @throws ClassNotFoundException
     * @throws IOException
     */
    protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException {
        ReplicationStream ois = getReplicationStream(data);
        session.getDeltaRequest().readExternal(ois);
        ois.close();
        return session.getDeltaRequest();
    }

    /**
     * serialize DeltaRequest
     * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
     * 
     * @param deltaRequest
     * @return serialized delta request
     * @throws IOException
     */
    protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest) throws IOException {
        return deltaRequest.serialize();
    }

    /**
     * Load sessions from other cluster node.
     * FIXME replace currently sessions with same id without notifcation.
     * FIXME SSO handling is not really correct with the session replacement!
     * @exception ClassNotFoundException
     *                if a serialized class cannot be found during the reload
     * @exception IOException
     *                if an input/output error occurs
     */
    protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException {

        // Initialize our internal data structures
        //sessions.clear(); //should not do this
        // Open an input stream to the specified pathname, if any
        ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
        ObjectInputStream ois = null;
        // Load the previously unloaded active sessions
        try {
            ois = getReplicationStream(data);
            Integer count = (Integer) ois.readObject();
            int n = count.intValue();
            for (int i = 0; i < n; i++) {
                DeltaSession session = (DeltaSession) createEmptySession();
                session.readObjectData(ois);
                session.setManager(this);
                session.setValid(true);
                session.setPrimarySession(false);
                //in case the nodes in the cluster are out of
                //time synch, this will make sure that we have the
                //correct timestamp, isValid returns true, cause
                // accessCount=1
                session.access();
                //make sure that the session gets ready to expire if
                // needed
                session.setAccessCount(0);
                session.resetDeltaRequest();
                // FIXME How inform other session id cache like SingleSignOn
                // increment sessionCounter to correct stats report
                if (findSession(session.getIdInternal()) == null ) {
                    sessionCounter++;
                } else {
                    sessionReplaceCounter++;
                    // FIXME better is to grap this sessions again !
                    if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
                }
                add(session);
            }
        } catch (ClassNotFoundException e) {
            log.error(sm.getString("deltaManager.loading.cnfe", e), e);
            throw e;
        } catch (IOException e) {
            log.error(sm.getString("deltaManager.loading.ioe", e), e);
            throw e;
        } finally {
            // Close the input stream
            try {
                if (ois != null) ois.close();
            } catch (IOException f) {
                // ignored
            }
            ois = null;
            if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
        }

    }

    

    /**
     * Save any currently active sessions in the appropriate persistence
     * mechanism, if any. If persistence is not supported, this method returns
     * without doing anything.
     * 
     * @exception IOException
     *                if an input/output error occurs
     */
    protected byte[] serializeSessions(Session[] currentSessions) throws IOException {

        // Open an output stream to the specified pathname, if any
        ByteArrayOutputStream fos = null;
        ObjectOutputStream oos = null;

        try {
            fos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(new BufferedOutputStream(fos));
            oos.writeObject(new Integer(currentSessions.length));
            for(int i=0 ; i < currentSessions.length;i++) {
                ((DeltaSession)currentSessions[i]).writeObjectData(oos);                
            }
            // Flush and close the output stream
            oos.flush();
        } catch (IOException e) {
            log.error(sm.getString("deltaManager.unloading.ioe", e), e);
            throw e;
        } finally {
            if (oos != null) {
                try {
                    oos.close();
                } catch (IOException f) {
                    ;
                }
                oos = null;
            }
        }
        // send object data as byte[]
        return fos.toByteArray();
    }

    // ------------------------------------------------------ Lifecycle Methods

    /**
     * Add a lifecycle event listener to this component.
     * 
     * @param listener
     *            The listener to add
     */
    public void addLifecycleListener(LifecycleListener listener) {
        lifecycle.addLifecycleListener(listener);
    }

    /**
     * Get the lifecycle listeners associated with this lifecycle. If this
     * Lifecycle has no listeners registered, a zero-length array is returned.
     */
    public LifecycleListener[] findLifecycleListeners() {
        return lifecycle.findLifecycleListeners();
    }

    /**
     * Remove a lifecycle event listener from this component.
     * 
     * @param listener
     *            The listener to remove
     */
    public void removeLifecycleListener(LifecycleListener listener) {
        lifecycle.removeLifecycleListener(listener);
    }

    /**
     * Prepare for the beginning of active use of the public methods of this
     * component. This method should be called after <code>configure(),
     * and before any of the public methods of the component are utilized.
     * 
     * @exception LifecycleException
     *                if this component detects a fatal error that prevents this
     *                component from being used
     */
    public void start() throws LifecycleException {
        if (!initialized) init();

        // Validate and update our current component state
        if (started) {
            return;
        }
        started = true;
        lifecycle.fireLifecycleEvent(START_EVENT, null);

        // Force initialization of the random number generator
        generateSessionId();

        // Load unloaded sessions, if any
        try {
            //the channel is already running
            Cluster cluster = getCluster() ;
            // stop remove cluster binding
            //wow, how many nested levels of if statements can we have ;)
            if(cluster == null) {
                Container context = getContainer() ;
                if(context != null && context instanceof Context) {
                     Container host = context.getParent() ;
                     if(host != null && host instanceof Host) {
                         cluster = host.getCluster();
                         if(cluster != null && cluster instanceof CatalinaCluster) {
                             setCluster((CatalinaCluster) cluster) ;
                         } else {
                             Container engine = host.getParent() ;
                             if(engine != null && engine instanceof Engine) {
                                 cluster = engine.getCluster();
                                 if(cluster != null && cluster instanceof CatalinaCluster) {
                                     setCluster((CatalinaCluster) cluster) ;
                                 }
                             } else {
                                     cluster = null ;
                             }
                         }
                     }
                }
            }
            if (cluster == null) {
                log.error(sm.getString("deltaManager.noCluster", getName()));
                return;
            } else {
                if (log.isInfoEnabled()) {
                    String type = "unknown" ;
                    if( cluster.getContainer() instanceof Host){
                        type = "Host" ;
                    } else if( cluster.getContainer() instanceof Engine){
                        type = "Engine" ;
                    }
                    log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
                }
            }
            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
            //to survice context reloads, as only a stop/start is called, not
            // createManager
            cluster.registerManager(this);

            getAllClusterSessions();

        } catch (Throwable t) {
            log.error(sm.getString("deltaManager.managerLoad"), t);
        }
    }

    /**
     * get from first session master the backup from all clustered sessions
     * @see #findSessionMasterMember()
     */
    public synchronized void getAllClusterSessions() {
        if (cluster != null && cluster.getMembers().length > 0) {
            long beforeSendTime = System.currentTimeMillis();
            Member mbr = findSessionMasterMember();
            if(mbr == null) { // No domain member found
                 return;
            }
            SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
            // set reference time
            stateTransferCreateSendTime = beforeSendTime ;
            // request session state
            counterSend_EVT_GET_ALL_SESSIONS++;
            stateTransfered = false ;
            // FIXME This send call block the deploy thread, when sender waitForAck is enabled
            try {
                synchronized(receivedMessageQueue) {
                     receiverQueue = true ;
                }
                cluster.send(msg, mbr);
                if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
                // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
                waitForSendAllSessions(beforeSendTime);
            } finally {
                synchronized(receivedMessageQueue) {
                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
                        SessionMessage smsg = (SessionMessage) iter.next();
                        if (!stateTimestampDrop) {
                            messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                        } else {
                            if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
                                // FIXME handle EVT_GET_ALL_SESSIONS later
                                messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                            } else {
                                if (log.isWarnEnabled()) {
                                    log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
                                }
                            }
                        }
                    }        
                    receivedMessageQueue.clear();
                    receiverQueue = false ;
                }
           }
        } else {
            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
        }
    }

    /**
     * Register cross context session at replication valve thread local
     * @param session cross context session
     */
    protected void registerSessionAtReplicationValve(DeltaSession session) {
        if(replicationValve == null) {
            if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
                Cluster cluster = getCluster() ;
                if(cluster != null && cluster instanceof CatalinaCluster) {
                    Valve[] valves = ((CatalinaCluster)cluster).getValves();
                    if(valves != null && valves.length > 0) {
                        for(int i=0; replicationValve == null && i < valves.length ; i++ ){
                            if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
                        }//for

                        if(replicationValve == null && log.isDebugEnabled()) {
                            log.debug("no ReplicationValve found for CrossContext Support");
                        }//endif 
                    }//end if
                }//endif
            }//end if
        }//end if
        if(replicationValve != null) {
            replicationValve.registerReplicationSession(session);
        }
    }
    
    /**
     * Find the master of the session state
     * @return master member of sessions 
     */
    protected Member findSessionMasterMember() {
        Member mbr = null;
        Member mbrs[] = cluster.getMembers();
        if(mbrs.length != 0 ) mbr = mbrs[0];
        if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
        if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
        return mbr;
    }

    /**
     * Wait that cluster session state is transfer or timeout after 60 Sec
     * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
     */
    protected void waitForSendAllSessions(long beforeSendTime) {
        long reqStart = System.currentTimeMillis();
        long reqNow = reqStart ;
        boolean isTimeout = false;
        if(getStateTransferTimeout() > 0) {
            // wait that state is transfered with timeout check
            do {
                try {
                    Thread.sleep(100);
                } catch (Exception sleep) {
                    //
                }
                reqNow = System.currentTimeMillis();
                isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
            } while ((!getStateTransfered()) && (!isTimeout));
        } else {
            if(getStateTransferTimeout() == -1) {
                // wait that state is transfered
                do {
                    try {
                        Thread.sleep(100);
                    } catch (Exception sleep) {
                    }
                } while ((!getStateTransfered()));
                reqNow = System.currentTimeMillis();
            }
        }
        if (isTimeout || (!getStateTransfered())) {
            counterNoStateTransfered++ ;
            log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
        } else {
            if (log.isInfoEnabled())
                log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
        }
    }

    /**
     * Gracefully terminate the active use of the public methods of this
     * component. This method should be the last one called on a given instance
     * of this component.
     * 
     * @exception LifecycleException
     *                if this component detects a fatal error that needs to be
     *                reported
     */
    public void stop() throws LifecycleException {

        if (log.isDebugEnabled())
            log.debug(sm.getString("deltaManager.stopped", getName()));


        // Validate and update our current component state
        if (!started)
            throw new LifecycleException(sm.getString("deltaManager.notStarted"));
        lifecycle.fireLifecycleEvent(STOP_EVENT, null);
        started = false;

        // Expire all active sessions
        if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
        Session sessions[] = findSessions();
        for (int i = 0; i < sessions.length; i++) {
            DeltaSession session = (DeltaSession) sessions[i];
            if (!session.isValid())
                continue;
            try {
                session.expire(true, isExpireSessionsOnShutdown());
            } catch (Throwable ignore) {
                ;
            } 
        }

        // Require a new random number generator if we are restarted
        this.random = null;
        getCluster().removeManager(this);
        replicationValve = null;
        if (initialized) {
            destroy();
        }
    }

    // ----------------------------------------- PropertyChangeListener Methods

    /**
     * Process property change events from our associated Context.
     * 
     * @param event
     *            The property change event that has occurred
     */
    public void propertyChange(PropertyChangeEvent event) {

        // Validate the source of this event
        if (!(event.getSource() instanceof Context))
            return;
        // Process a relevant property change
        if (event.getPropertyName().equals("sessionTimeout")) {
            try {
                setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
            } catch (NumberFormatException e) {
                log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
            }
        }

    }

    // -------------------------------------------------------- Replication
    // Methods

    /**
     * A message was received from another node, this is the callback method to
     * implement if you are interested in receiving replication messages.
     * 
     * @param cmsg -
     *            the message received.
     */
    public void messageDataReceived(ClusterMessage cmsg) {
        if (cmsg != null && cmsg instanceof SessionMessage) {
            SessionMessage msg = (SessionMessage) cmsg;
            switch (msg.getEventType()) {
                case SessionMessage.EVT_GET_ALL_SESSIONS:
                case SessionMessage.EVT_SESSION_CREATED: 
                case SessionMessage.EVT_SESSION_EXPIRED: 
                case SessionMessage.EVT_SESSION_ACCESSED:
                case SessionMessage.EVT_SESSION_DELTA: {
                    synchronized(receivedMessageQueue) {
                        if(receiverQueue) {
                            receivedMessageQueue.add(msg);
                            return ;
                        }
                    }
                   break;
                }
                default: {
                    //we didn't queue, do nothing
                    break;
                }
            } //switch
            
            messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
        }
    }

    /**
     * When the request has been completed, the replication valve will notify
     * the manager, and the manager will decide whether any replication is
     * needed or not. If there is a need for replication, the manager will
     * create a session message and that will be replicated. The cluster
     * determines where it gets sent.
     * 
     * @param sessionId -
     *            the sessionId that just completed.
     * @return a SessionMessage to be sent,
     */
    public ClusterMessage requestCompleted(String sessionId) {
        try {
            DeltaSession session = (DeltaSession) findSession(sessionId);
            DeltaRequest deltaRequest = session.getDeltaRequest();
            SessionMessage msg = null;
            boolean isDeltaRequest = false ;
            synchronized(deltaRequest) {
                isDeltaRequest = deltaRequest.getSize() > 0 ;
                if (isDeltaRequest) {    
                    counterSend_EVT_SESSION_DELTA++;
                    byte[] data = serializeDeltaRequest(deltaRequest);
                    msg = new SessionMessageImpl(getName(),
                                                 SessionMessage.EVT_SESSION_DELTA, 
                                                 data, 
                                                 sessionId,
                                                 sessionId + "-" + System.currentTimeMillis());
                    session.resetDeltaRequest();
                }  
            }
            if(!isDeltaRequest) {
                if(!session.isPrimarySession()) {               
                    counterSend_EVT_SESSION_ACCESSED++;
                    msg = new SessionMessageImpl(getName(),
                                                 SessionMessage.EVT_SESSION_ACCESSED, 
                                                 null, 
                                                 sessionId,
                                                 sessionId + "-" + System.currentTimeMillis());
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
                    }
                }    
            } else { // log only outside synch block!
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
                }
            }
            session.setPrimarySession(true);
            //check to see if we need to send out an access message
            if ((msg == null)) {
                long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
                if (replDelta > (getMaxInactiveInterval() * 1000)) {
                    counterSend_EVT_SESSION_ACCESSED++;
                    msg = new SessionMessageImpl(getName(),
                                                 SessionMessage.EVT_SESSION_ACCESSED, 
                                                 null,
                                                 sessionId, 
                                                 sessionId + "-" + System.currentTimeMillis());
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
                    }
                }

            }

            //update last replicated time
            if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
            return msg;
        } catch (IOException x) {
            log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
            return null;
        }

    }
    /**
     * Reset manager statistics
     */
    public synchronized void resetStatistics() {
        processingTime = 0 ;
        expiredSessions = 0 ;
        rejectedSessions = 0 ;
        sessionReplaceCounter = 0 ;
        counterNoStateTransfered = 0 ;
        maxActive = getActiveSessions() ;
        sessionCounter = getActiveSessions() ;
        counterReceive_EVT_ALL_SESSION_DATA = 0;
        counterReceive_EVT_GET_ALL_SESSIONS = 0;
        counterReceive_EVT_SESSION_ACCESSED = 0 ;
        counterReceive_EVT_SESSION_CREATED = 0 ;
        counterReceive_EVT_SESSION_DELTA = 0 ;
        counterReceive_EVT_SESSION_EXPIRED = 0 ;
        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
        counterSend_EVT_ALL_SESSION_DATA = 0;
        counterSend_EVT_GET_ALL_SESSIONS = 0;
        counterSend_EVT_SESSION_ACCESSED = 0 ;
        counterSend_EVT_SESSION_CREATED = 0 ;
        counterSend_EVT_SESSION_DELTA = 0 ;
        counterSend_EVT_SESSION_EXPIRED = 0 ;
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
        
    }
   
    //  -------------------------------------------------------- persistence handler

    public void load() {

    }

    public void unload() {

    }

    //  -------------------------------------------------------- expire

    /**
     * send session expired to other cluster nodes
     * 
     * @param id
     *            session id
     */
    protected void sessionExpired(String id) {
        counterSend_EVT_SESSION_EXPIRED++ ;
        SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
        send(msg);
    }

    /**
     * Exipre all find sessions.
     */
    public void expireAllLocalSessions()
    {
        long timeNow = System.currentTimeMillis();
        Session sessions[] = findSessions();
        int expireDirect  = 0 ;
        int expireIndirect = 0 ;
        
        if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
        for (int i = 0; i < sessions.length; i++) {
            if (sessions[i] instanceof DeltaSession) {
                DeltaSession session = (DeltaSession) sessions[i];
                if (session.isPrimarySession()) {
                    if (session.isValid()) {
                        session.expire();
                        expireDirect++;
                    } else {
                        expireIndirect++;
                    }//end if
                }//end if
            }//end if
        }//for
        long timeEnd = System.currentTimeMillis();
        if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
      
    }
    
    /**
     * When the manager expires session not tied to a request. The cluster will
     * periodically ask for a list of sessions that should expire and that
     * should be sent across the wire.
     * 
     * @return The invalidated sessions array
     */
    public String[] getInvalidatedSessions() {
        return new String[0];
    }

    //  -------------------------------------------------------- message receive

    /**
     * Test that sender and local domain is the same
     */
    protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
        boolean sameDomain= true;
        if (!sameDomain && log.isWarnEnabled()) {
                log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
                         new Object[] {getName(), 
                         msg.getEventTypeString(), 
                         sender,
                         "",
                         "" }));
        }
        return sameDomain ;
    }

    /**
     * This method is called by the received thread when a SessionMessage has
     * been received from one of the other nodes in the cluster.
     * 
     * @param msg -
     *            the message received
     * @param sender -
     *            the sender of the message, this is used if we receive a
     *            EVT_GET_ALL_SESSION message, so that we only reply to the
     *            requesting node
     */
    protected void messageReceived(SessionMessage msg, Member sender) {
        if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
            return;
        }
        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
        try {
            
            ClassLoader[] loaders = getClassLoaders();
            if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
 
            switch (msg.getEventType()) {
                case SessionMessage.EVT_GET_ALL_SESSIONS: {
                    handleGET_ALL_SESSIONS(msg,sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_DATA: {
                    handleALL_SESSION_DATA(msg,sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
                    handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_CREATED: {
                    handleSESSION_CREATED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_EXPIRED: {
                    handleSESSION_EXPIRED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_ACCESSED: {
                    handleSESSION_ACCESSED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_DELTA: {
                   handleSESSION_DELTA(msg,sender);
                   break;
                }
                default: {
                    //we didn't recognize the message type, do nothing
                    break;
                }
            } //switch
        } catch (Exception x) {
            log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
        } finally {
            Thread.currentThread().setContextClassLoader(contextLoader);
        }
    }

    // -------------------------------------------------------- message receiver handler


    /**
     * handle receive session state is complete transfered
     * @param msg
     * @param sender
     */
    protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
        stateTransferCreateSendTime = msg.getTimestamp() ;
        stateTransfered = true ;
    }

    /**
     * handle receive session delta
     * @param msg
     * @param sender
     * @throws IOException
     * @throws ClassNotFoundException
     */
    protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
        counterReceive_EVT_SESSION_DELTA++;
        byte[] delta = msg.getSession();
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
            DeltaRequest dreq = deserializeDeltaRequest(session, delta);
            dreq.execute(session, notifyListenersOnReplication);
            session.setPrimarySession(false);
        }
    }

    /**
     * handle receive session is access at other node ( primary session is now false)
     * @param msg
     * @param sender
     * @throws IOException
     */
    protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
        counterReceive_EVT_SESSION_ACCESSED++;
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
            session.access();
            session.setPrimarySession(false);
            session.endAccess();
        }
    }

    /**
     * handle receive session is expire at other node ( expire session also here)
     * @param msg
     * @param sender
     * @throws IOException
     */
    protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
        counterReceive_EVT_SESSION_EXPIRED++;
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
            session.expire(notifySessionListenersOnReplication, false);
        }
    }

    /**
     * handle receive new session is created at other node (create backup - primary false)
     * @param msg
     * @param sender
     */
    protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
        counterReceive_EVT_SESSION_CREATED++;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
        DeltaSession session = (DeltaSession) createEmptySession();
        session.setManager(this);
        session.setValid(true);
        session.setPrimarySession(false);
        session.setCreationTime(msg.getTimestamp());
        // use container maxInactiveInterval so that session will expire correctly in case of primary transfer
        session.setMaxInactiveInterval(getMaxInactiveInterval());
        session.access();
        if(notifySessionListenersOnReplication)
            session.setId(msg.getSessionID());
        else
            session.setIdInternal(msg.getSessionID());
        session.resetDeltaRequest();
        session.endAccess();

    }

    /**
     * handle receive sessions from other not ( restart )
     * @param msg
     * @param sender
     * @throws ClassNotFoundException
     * @throws IOException
     */
    protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
        counterReceive_EVT_ALL_SESSION_DATA++;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
        byte[] data = msg.getSession();
        deserializeSessions(data);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
        //stateTransferred = true;
    }

    /**
     * handle receive that other node want all sessions ( restart )
     * a) send all sessions with one message
     * b) send session at blocks
     * After sending send state is complete transfered
     * @param msg
     * @param sender
     * @throws IOException
     */
    protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
        counterReceive_EVT_GET_ALL_SESSIONS++;
        //get a list of all the session from this manager
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
        // Write the number of active sessions, followed by the details
        // get all sessions and serialize without sync
        Session[] currentSessions = findSessions();
        long findSessionTimestamp = System.currentTimeMillis() ;
        if (isSendAllSessions()) {
            sendSessions(sender, currentSessions, findSessionTimestamp);
        } else {
            // send session at blocks
            int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
            Session[] sendSessions = new Session[len];
            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
                len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
                System.arraycopy(currentSessions, i, sendSessions, 0, len);
                sendSessions(sender, sendSessions,findSessionTimestamp);
                if (getSendAllSessionsWaitTime() > 0) {
                    try {
                        Thread.sleep(getSendAllSessionsWaitTime());
                    } catch (Exception sleep) {
                    }
                }//end if
            }//for
        }//end if
        
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
        newmsg.setTimestamp(findSessionTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
        cluster.send(newmsg, sender);
    }


    /**
     * send a block of session to sender
     * @param sender
     * @param currentSessions
     * @param sendTimestamp
     * @throws IOException
     */
    protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
        byte[] data = serializeSessions(currentSessions);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
        newmsg.setTimestamp(sendTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
        counterSend_EVT_ALL_SESSION_DATA++;
        cluster.send(newmsg, sender);
    }

    public ClusterManager cloneFromTemplate() {
        DeltaManager result = new DeltaManager();
        result.name = "Clone-from-"+name;
        result.cluster = cluster;
        result.replicationValve = replicationValve;
        result.maxActiveSessions = maxActiveSessions;
        result.expireSessionsOnShutdown = expireSessionsOnShutdown;
        result.notifyListenersOnReplication = notifyListenersOnReplication;
        result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
        result.stateTransferTimeout = stateTransferTimeout;
        result.sendAllSessions = sendAllSessions;
        result.sendClusterDomainOnly = sendClusterDomainOnly ;
        result.sendAllSessionsSize = sendAllSessionsSize;
        result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; 
        result.receiverQueue = receiverQueue ;
        result.stateTimestampDrop = stateTimestampDrop ;
        result.stateTransferCreateSendTime = stateTransferCreateSendTime; 
        return result;
    }
}

Other Tomcat examples (source code examples)

Here is a short list of links related to this Tomcat DeltaManager.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.