alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Tomcat example source code file (SimpleTcpReplicationManager.java)

This example Tomcat source code file (SimpleTcpReplicationManager.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Tomcat tags/keywords

exception, io, manager, received, replicatedsession, replicatedsession, replicationstream, session, session, sessionmessage, sessionmessage, simpletcpreplicationmanager, string, string, unable

The Tomcat SimpleTcpReplicationManager.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.ha.session;

import java.io.IOException;

import org.apache.catalina.LifecycleException;
import org.apache.catalina.Session;
import org.apache.catalina.ha.CatalinaCluster;
import org.apache.catalina.ha.ClusterManager;
import org.apache.catalina.ha.ClusterMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.realm.GenericPrincipal;
import org.apache.catalina.session.StandardManager;
import org.apache.catalina.tribes.io.ReplicationStream;
import java.io.ByteArrayInputStream;
import org.apache.catalina.Loader;

/**
 * Title:        Tomcat Session Replication for Tomcat 4.0 <BR>
 * Description:  A very simple straight forward implementation of
 *               session replication of servers in a cluster.<BR>
 *               This session replication is implemented "live". By live
 *               I mean, when a session attribute is added into a session on Node A
 *               a message is broadcasted to other messages and setAttribute is called on the
 *               replicated sessions.<BR>
 *               A full description of this implementation can be found under
 *               <href="http://www.filip.net/tomcat/">Filip's Tomcat Page
* * Copyright: See apache license * Company: www.filip.net * @author <a href="mailto:mail@filip.net">Filip Hanik * @author Bela Ban (modifications for synchronous replication) * @version 1.0 for TC 4.0 * Description: The InMemoryReplicationManager is a session manager that replicated * session information in memory. * <BR>
* The InMemoryReplicationManager extends the StandardManager hence it allows for us * to inherit all the basic session management features like expiration, session listeners etc * <BR>
* To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages * all defined in the SessionMessage class.<BR> * When a session is replicated (not an attribute added/removed) the session is serialized into * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods. */ public class SimpleTcpReplicationManager extends StandardManager implements ClusterManager { public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( SimpleTcpReplicationManager.class ); //the channel configuration protected String mChannelConfig = null; //the group name protected String mGroupName = "TomcatReplication"; //somehow start() gets called more than once protected boolean mChannelStarted = false; //log to screen protected boolean mPrintToScreen = true; protected boolean defaultMode = false; protected boolean mManagerRunning = false; /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc) * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for * all responses. */ protected boolean synchronousReplication=true; /** Set to true if we don't want the sessions to expire on shutdown */ protected boolean mExpireSessionsOnShutdown = true; protected boolean useDirtyFlag = false; protected String name; protected boolean distributable = true; protected CatalinaCluster cluster; protected java.util.HashMap invalidatedSessions = new java.util.HashMap(); /** * Flag to keep track if the state has been transferred or not * Assumes false. */ protected boolean stateTransferred = false; private boolean notifyListenersOnReplication; private boolean sendClusterDomainOnly = true ; /** * Constructor, just calls super() * */ public SimpleTcpReplicationManager() { super(); } public boolean doDomainReplication() { return sendClusterDomainOnly; } /** * @param sendClusterDomainOnly The sendClusterDomainOnly to set. */ public void setDomainReplication(boolean sendClusterDomainOnly) { this.sendClusterDomainOnly = sendClusterDomainOnly; } /** * @return Returns the defaultMode. */ public boolean isDefaultMode() { return defaultMode; } /** * @param defaultMode The defaultMode to set. */ public void setDefaultMode(boolean defaultMode) { this.defaultMode = defaultMode; } public boolean isManagerRunning() { return mManagerRunning; } public void setUseDirtyFlag(boolean usedirtyflag) { this.useDirtyFlag = usedirtyflag; } public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { mExpireSessionsOnShutdown = expireSessionsOnShutdown; } public void setCluster(CatalinaCluster cluster) { if(log.isDebugEnabled()) log.debug("Cluster associated with SimpleTcpReplicationManager"); this.cluster = cluster; } public boolean getExpireSessionsOnShutdown() { return mExpireSessionsOnShutdown; } public void setPrintToScreen(boolean printtoscreen) { if(log.isDebugEnabled()) log.debug("Setting screen debug to:"+printtoscreen); mPrintToScreen = printtoscreen; } public void setSynchronousReplication(boolean flag) { synchronousReplication=flag; } /** * Override persistence since they don't go hand in hand with replication for now. */ public void unload() throws IOException { if ( !getDistributable() ) { super.unload(); } } /** * Creates a HTTP session. * Most of the code in here is copied from the StandardManager. * This is not pretty, yeah I know, but it was necessary since the * StandardManager had hard coded the session instantiation to the a * StandardSession, when we actually want to instantiate a ReplicatedSession<BR> * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other * nodes in the cluster that this session has been created. * @param notify - if set to true the other nodes in the cluster will be notified. * This flag is needed so that we can create a session before we deserialize * a replicated one * * @see ReplicatedSession */ protected Session createSession(String sessionId, boolean notify, boolean setId) { //inherited from the basic manager if ((getMaxActiveSessions() >= 0) && (sessions.size() >= getMaxActiveSessions())) throw new IllegalStateException(sm.getString("standardManager.createSession.ise")); Session session = new ReplicatedSession(this); // Initialize the properties of the new session and return it session.setNew(true); session.setValid(true); session.setCreationTime(System.currentTimeMillis()); session.setMaxInactiveInterval(this.maxInactiveInterval); if(sessionId == null) sessionId = generateSessionId(); if ( setId ) session.setId(sessionId); if ( notify && (cluster!=null) ) { ((ReplicatedSession)session).setIsDirty(true); } return (session); }//createSession //========================================================================= // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION //========================================================================= /** * Construct and return a new session object, based on the default * settings specified by this Manager's properties. The session * id will be assigned by this method, and available via the getId() * method of the returned session. If a new session cannot be created * for any reason, return <code>null. * * @exception IllegalStateException if a new session cannot be * instantiated for any reason */ public Session createSession(String sessionId) { //create a session and notify the other nodes in the cluster Session session = createSession(sessionId,getDistributable(),true); add(session); return session; } public void sessionInvalidated(String sessionId) { synchronized ( invalidatedSessions ) { invalidatedSessions.put(sessionId, sessionId); } } public String[] getInvalidatedSessions() { synchronized ( invalidatedSessions ) { String[] result = new String[invalidatedSessions.size()]; invalidatedSessions.values().toArray(result); return result; } } public ClusterMessage requestCompleted(String sessionId) { if ( !getDistributable() ) { log.warn("Received requestCompleted message, although this context["+ getName()+"] is not distributable. Ignoring message"); return null; } try { if ( invalidatedSessions.get(sessionId) != null ) { synchronized ( invalidatedSessions ) { invalidatedSessions.remove(sessionId); SessionMessage msg = new SessionMessageImpl(name, SessionMessage.EVT_SESSION_EXPIRED, null, sessionId, sessionId); return msg; } } else { ReplicatedSession session = (ReplicatedSession) findSession( sessionId); if (session != null) { //return immediately if the session is not dirty if (useDirtyFlag && (!session.isDirty())) { //but before we return doing nothing, //see if we should send //an updated last access message so that //sessions across cluster dont expire long interval = session.getMaxInactiveInterval(); long lastaccdist = System.currentTimeMillis() - session.getLastAccessWasDistributed(); if ( ((interval*1000) / lastaccdist)< 3 ) { SessionMessage accmsg = new SessionMessageImpl(name, SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, sessionId); session.setLastAccessWasDistributed(System.currentTimeMillis()); return accmsg; } return null; } session.setIsDirty(false); if (log.isDebugEnabled()) { try { log.debug("Sending session to cluster=" + session); } catch (Exception ignore) {} } SessionMessage msg = new SessionMessageImpl(name, SessionMessage.EVT_SESSION_CREATED, writeSession(session), session.getIdInternal(), session.getIdInternal()); return msg; } //end if }//end if } catch (Exception x ) { log.error("Unable to replicate session",x); } return null; } /** * Serialize a session into a byte array<BR> * This method simple calls the writeObjectData method on the session * and returns the byte data from that call * @param session - the session to be serialized * @return a byte array containing the session data, null if the serialization failed */ protected byte[] writeSession( Session session ) { try { java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream(); java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data); session_out.flush(); boolean hasPrincipal = session.getPrincipal() != null; session_out.writeBoolean(hasPrincipal); if ( hasPrincipal ) { session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal())); }//end if ((ReplicatedSession)session).writeObjectData(session_out); return session_data.toByteArray(); } catch ( Exception x ) { log.error("Failed to serialize the session!",x); } return null; } /** * Open Stream and use correct ClassLoader (Container) Switch * ThreadClassLoader * * @param data * @return The object input stream * @throws IOException */ public ReplicationStream getReplicationStream(byte[] data) throws IOException { return getReplicationStream(data,0,data.length); } public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException { ByteArrayInputStream fis =null; ReplicationStream ois = null; Loader loader = null; ClassLoader classLoader = null; //fix to be able to run the DeltaManager //stand alone without a container. //use the Threads context class loader if (container != null) loader = container.getLoader(); if (loader != null) classLoader = loader.getClassLoader(); else classLoader = Thread.currentThread().getContextClassLoader(); //end fix fis = new ByteArrayInputStream(data, offset, length); if ( classLoader == Thread.currentThread().getContextClassLoader() ) { ois = new ReplicationStream(fis, new ClassLoader[] {classLoader}); } else { ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()}); } return ois; } /** * Reinstantiates a serialized session from the data passed in. * This will first call createSession() so that we get a fresh instance with all * the managers set and all the transient fields validated. * Then it calls Session.readObjectData(byte[]) to deserialize the object * @param data - a byte array containing session data * @return a valid Session object, null if an error occurs * */ protected Session readSession( byte[] data, String sessionId ) { try { ReplicationStream session_in = getReplicationStream(data); Session session = sessionId!=null?this.findSession(sessionId):null; boolean isNew = (session==null); //clear the old values from the existing session if ( session!=null ) { ReplicatedSession rs = (ReplicatedSession)session; rs.expire(false); //cleans up the previous values, since we are not doing removes session = null; }//end if if (session==null) { session = createSession(null,false, false); sessions.remove(session.getIdInternal()); } boolean hasPrincipal = session_in.readBoolean(); SerializablePrincipal p = null; if ( hasPrincipal ) p = (SerializablePrincipal)session_in.readObject(); ((ReplicatedSession)session).readObjectData(session_in); if ( hasPrincipal ) session.setPrincipal(p.getPrincipal(getContainer().getRealm())); ((ReplicatedSession)session).setId(sessionId,isNew); ReplicatedSession rsession = (ReplicatedSession)session; rsession.setAccessCount(1); session.setManager(this); session.setValid(true); rsession.setLastAccessedTime(System.currentTimeMillis()); rsession.setThisAccessedTime(System.currentTimeMillis()); ((ReplicatedSession)session).setAccessCount(0); session.setNew(false); if(log.isTraceEnabled()) log.trace("Session loaded id="+sessionId + " actualId="+session.getId()+ " exists="+this.sessions.containsKey(sessionId)+ " valid="+rsession.isValid()); return session; } catch ( Exception x ) { log.error("Failed to deserialize the session!",x); } return null; } public String getName() { return this.name; } /** * Prepare for the beginning of active use of the public methods of this * component. This method should be called after <code>configure(), * and before any of the public methods of the component are utilized.<BR> * Starts the cluster communication channel, this will connect with the other nodes * in the cluster, and request the current session state to be transferred to this node. * @exception IllegalStateException if this component has already been * started * @exception LifecycleException if this component detects a fatal error * that prevents this component from being used */ public void start() throws LifecycleException { mManagerRunning = true; super.start(); try { //the channel is already running if ( mChannelStarted ) return; if(log.isInfoEnabled()) log.info("Starting clustering manager...:"+getName()); if ( cluster == null ) { log.error("Starting... no cluster associated with this context:"+getName()); return; } cluster.registerManager(this); if (cluster.getMembers().length > 0) { Member mbr = cluster.getMembers()[0]; SessionMessage msg = new SessionMessageImpl(this.getName(), SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", "GET-ALL-"+this.getName()); cluster.send(msg, mbr); if(log.isWarnEnabled()) log.warn("Manager["+getName()+"], requesting session state from "+mbr+ ". This operation will timeout if no session state has been received within "+ "60 seconds"); long reqStart = System.currentTimeMillis(); long reqNow = 0; boolean isTimeout=false; do { try { Thread.sleep(100); }catch ( Exception sleep) {} reqNow = System.currentTimeMillis(); isTimeout=((reqNow-reqStart)>(1000*60)); } while ( (!isStateTransferred()) && (!isTimeout)); if ( isTimeout || (!isStateTransferred()) ) { log.error("Manager["+getName()+"], No session state received, timing out."); }else { if(log.isInfoEnabled()) log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms."); } } else { if(log.isInfoEnabled()) log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group."); }//end if mChannelStarted = true; } catch ( Exception x ) { log.error("Unable to start SimpleTcpReplicationManager",x); } } /** * Gracefully terminate the active use of the public methods of this * component. This method should be the last one called on a given * instance of this component.<BR> * This will disconnect the cluster communication channel and stop the listener thread. * @exception IllegalStateException if this component has not been started * @exception LifecycleException if this component detects a fatal error * that needs to be reported */ public void stop() throws LifecycleException { mManagerRunning = false; mChannelStarted = false; super.stop(); try { this.sessions.clear(); cluster.removeManager(this); } catch ( Exception x ) { log.error("Unable to stop SimpleTcpReplicationManager",x); } } public void setDistributable(boolean dist) { this.distributable = dist; } public boolean getDistributable() { return distributable; } /** * This method is called by the received thread when a SessionMessage has * been received from one of the other nodes in the cluster. * @param msg - the message received * @param sender - the sender of the message, this is used if we receive a * EVT_GET_ALL_SESSION message, so that we only reply to * the requesting node */ protected void messageReceived( SessionMessage msg, Member sender ) { try { if(log.isInfoEnabled()) { log.debug("Received SessionMessage of type="+msg.getEventTypeString()); log.debug("Received SessionMessage sender="+sender); } switch ( msg.getEventType() ) { case SessionMessage.EVT_GET_ALL_SESSIONS: { //get a list of all the session from this manager Object[] sessions = findSessions(); java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream(); java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout); oout.writeInt(sessions.length); for (int i=0; i<sessions.length; i++){ ReplicatedSession ses = (ReplicatedSession)sessions[i]; oout.writeUTF(ses.getIdInternal()); byte[] data = writeSession(ses); oout.writeObject(data); }//for //don't send a message if we don't have to oout.flush(); oout.close(); byte[] data = bout.toByteArray(); SessionMessage newmsg = new SessionMessageImpl(name, SessionMessage.EVT_ALL_SESSION_DATA, data, "SESSION-STATE","SESSION-STATE-"+getName()); cluster.send(newmsg, sender); break; } case SessionMessage.EVT_ALL_SESSION_DATA: { java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(msg.getSession()); java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin); int size = oin.readInt(); for ( int i=0; i<size; i++) { String id = oin.readUTF(); byte[] data = (byte[])oin.readObject(); Session session = readSession(data,id); }//for stateTransferred=true; break; } case SessionMessage.EVT_SESSION_CREATED: { Session session = this.readSession(msg.getSession(),msg.getSessionID()); if ( log.isDebugEnabled() ) { log.debug("Received replicated session=" + session + " isValid=" + session.isValid()); } break; } case SessionMessage.EVT_SESSION_EXPIRED: { Session session = findSession(msg.getSessionID()); if ( session != null ) { session.expire(); this.remove(session); }//end if break; } case SessionMessage.EVT_SESSION_ACCESSED :{ Session session = findSession(msg.getSessionID()); if ( session != null ) { session.access(); session.endAccess(); } break; } default: { //we didn't recognize the message type, do nothing break; } }//switch } catch ( Exception x ) { log.error("Unable to receive message through TCP channel",x); } } public void messageDataReceived(ClusterMessage cmsg) { try { if ( cmsg instanceof SessionMessage ) { SessionMessage msg = (SessionMessage)cmsg; messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null); } } catch(Throwable ex){ log.error("InMemoryReplicationManager.messageDataReceived()", ex); }//catch } public boolean isStateTransferred() { return stateTransferred; } public void setName(String name) { this.name = name; } public boolean isNotifyListenersOnReplication() { return notifyListenersOnReplication; } public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) { this.notifyListenersOnReplication = notifyListenersOnReplication; } /* * @see org.apache.catalina.ha.ClusterManager#getCluster() */ public CatalinaCluster getCluster() { return cluster; } public ClusterManager cloneFromTemplate() { throw new UnsupportedOperationException(); } }

Other Tomcat examples (source code examples)

Here is a short list of links related to this Tomcat SimpleTcpReplicationManager.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.