alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Tomcat example source code file (ChannelCoordinator.java)

This example Tomcat source code file (ChannelCoordinator.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Tomcat tags/keywords

channelcoordinator, channelcoordinator, channelexception, channelexception, channelreceiver, channelsender, exception, illegalargumentexception, invalid, member, member, membershipservice, membershipservice, replicationtransmitter

The Tomcat ChannelCoordinator.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.group;

import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.ChannelSender;

import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.transport.SenderState;
import org.apache.catalina.tribes.transport.ReplicationTransmitter;
import org.apache.catalina.tribes.membership.McastService;
import org.apache.catalina.tribes.transport.nio.NioReceiver;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.util.Logs;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.util.Arrays;


/**
 * The channel coordinator object coordinates the membership service,
 * the sender and the receiver.
 * This is the last interceptor in the chain.
 * @author Filip Hanik
 * @version $Revision: 532943 $, $Date: 2007-04-27 05:14:58 +0200 (ven., 27 avr. 2007) $
 */
public class ChannelCoordinator extends ChannelInterceptorBase implements MessageListener {
    private ChannelReceiver clusterReceiver = new NioReceiver();
    private ChannelSender clusterSender = new ReplicationTransmitter();
    private MembershipService membershipService = new McastService();
    
    //override optionflag
    protected int optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE|Channel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
    public int getOptionFlag() {return optionFlag;}
    public void setOptionFlag(int flag) {optionFlag=flag;}
    
    private int startLevel = 0;

    public ChannelCoordinator() {
        
    }
    
    public ChannelCoordinator(ChannelReceiver receiver,
                              ChannelSender sender,
                              MembershipService service) {
        this();
        this.setClusterReceiver(receiver);
        this.setClusterSender(sender);
        this.setMembershipService(service);
    }
    
    /**
     * Send a message to one or more members in the cluster
     * @param destination Member[] - the destinations, null or zero length means all
     * @param msg ClusterMessage - the message to send
     * @param options int - sender options, see class documentation
     * @return ClusterMessage[] - the replies from the members, if any.
     */
    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
        if ( destination == null ) destination = membershipService.getMembers();
        clusterSender.sendMessage(msg,destination);
        if ( Logs.MESSAGES.isTraceEnabled() ) {
            Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
        }
    }
    

    /**
     * Starts up the channel. This can be called multiple times for individual services to start
     * The svc parameter can be the logical or value of any constants
     * @param svc int value of <BR>
     * DEFAULT - will start all services <BR>
     * MBR_RX_SEQ - starts the membership receiver <BR>
     * MBR_TX_SEQ - starts the membership broadcaster <BR>
     * SND_TX_SEQ - starts the replication transmitter<BR>
     * SND_RX_SEQ - starts the replication receiver<BR>
     * @throws ChannelException if a startup error occurs or the service is already started.
     */
    public void start(int svc) throws ChannelException {
        this.internalStart(svc);
    }

    /**
     * Shuts down the channel. This can be called multiple times for individual services to shutdown
     * The svc parameter can be the logical or value of any constants
     * @param svc int value of <BR>
     * DEFAULT - will shutdown all services <BR>
     * MBR_RX_SEQ - stops the membership receiver <BR>
     * MBR_TX_SEQ - stops the membership broadcaster <BR>
     * SND_TX_SEQ - stops the replication transmitter<BR>
     * SND_RX_SEQ - stops the replication receiver<BR>
     * @throws ChannelException if a startup error occurs or the service is already started.
     */
    public void stop(int svc) throws ChannelException {
        this.internalStop(svc);
    }    


    /**
     * Starts up the channel. This can be called multiple times for individual services to start
     * The svc parameter can be the logical or value of any constants
     * @param svc int value of <BR>
     * DEFAULT - will start all services <BR>
     * MBR_RX_SEQ - starts the membership receiver <BR>
     * MBR_TX_SEQ - starts the membership broadcaster <BR>
     * SND_TX_SEQ - starts the replication transmitter<BR>
     * SND_RX_SEQ - starts the replication receiver<BR>
     * @throws ChannelException if a startup error occurs or the service is already started.
     */
    protected synchronized void internalStart(int svc) throws ChannelException {
        try {
            boolean valid = false;
            //make sure we don't pass down any flags that are unrelated to the bottom layer
            svc = svc & Channel.DEFAULT;

            if (startLevel == Channel.DEFAULT) return; //we have already started up all components
            if (svc == 0 ) return;//nothing to start
            
            if (svc == (svc & startLevel)) throw new ChannelException("Channel already started for level:"+svc);

            //must start the receiver first so that we can coordinate the port it
            //listens to with the local membership settings
            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
                clusterReceiver.setMessageListener(this);
                clusterReceiver.start();
                //synchronize, big time FIXME
                membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
                valid = true;
            }
            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
                clusterSender.start();
                valid = true;
            }
            
            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
                membershipService.setMembershipListener(this);
                membershipService.start(MembershipService.MBR_RX);
                valid = true;
            }
            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
                membershipService.start(MembershipService.MBR_TX);
                valid = true;
            }
            
            if ( !valid) {
                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
            }
            startLevel = (startLevel | svc);
        }catch ( ChannelException cx ) {
            throw cx;
        }catch ( Exception x ) {
            throw new ChannelException(x);
        }
    }

    /**
     * Shuts down the channel. This can be called multiple times for individual services to shutdown
     * The svc parameter can be the logical or value of any constants
     * @param svc int value of <BR>
     * DEFAULT - will shutdown all services <BR>
     * MBR_RX_SEQ - starts the membership receiver <BR>
     * MBR_TX_SEQ - starts the membership broadcaster <BR>
     * SND_TX_SEQ - starts the replication transmitter<BR>
     * SND_RX_SEQ - starts the replication receiver<BR>
     * @throws ChannelException if a startup error occurs or the service is already started.
     */
    protected synchronized void internalStop(int svc) throws ChannelException {
        try {
            //make sure we don't pass down any flags that are unrelated to the bottom layer
            svc = svc & Channel.DEFAULT;

            if (startLevel == 0) return; //we have already stopped up all components
            if (svc == 0 ) return;//nothing to stop

            boolean valid = false;
            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
                clusterReceiver.stop();
                clusterReceiver.setMessageListener(null);
                valid = true;
            }
            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
                clusterSender.stop();
                valid = true;
            }

            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
                membershipService.stop(MembershipService.MBR_RX);
                membershipService.setMembershipListener(null);
                valid = true;
                
            }
            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
                valid = true;
                membershipService.stop(MembershipService.MBR_TX);
            }            
            if ( !valid) {
                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
            }

            startLevel = (startLevel & (~svc));
            
        }catch ( Exception x ) {
            throw new ChannelException(x);
        } finally {
            
        }

    }
    
    public void memberAdded(Member member){
        SenderState.getSenderState(member);
        super.memberAdded(member);
    }
    
    public void memberDisappeared(Member member){
        SenderState.removeSenderState(member);
        super.memberDisappeared(member);
    }
    
    public void messageReceived(ChannelMessage msg) {
        if ( Logs.MESSAGES.isTraceEnabled() ) {
            Logs.MESSAGES.trace("ChannelCoordinator - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
        }
        super.messageReceived(msg);
    }


    public ChannelReceiver getClusterReceiver() {
        return clusterReceiver;
    }

    public ChannelSender getClusterSender() {
        return clusterSender;
    }

    public MembershipService getMembershipService() {
        return membershipService;
    }

    public void setClusterReceiver(ChannelReceiver clusterReceiver) {
        if ( clusterReceiver != null ) {
            this.clusterReceiver = clusterReceiver;
            this.clusterReceiver.setMessageListener(this);
        } else {
            if  (this.clusterReceiver!=null ) this.clusterReceiver.setMessageListener(null);
            this.clusterReceiver = null;
        }
    }

    public void setClusterSender(ChannelSender clusterSender) {
        this.clusterSender = clusterSender;
    }

    public void setMembershipService(MembershipService membershipService) {
        this.membershipService = membershipService;
        this.membershipService.setMembershipListener(this);
    }
    
    public void heartbeat() {
        if ( clusterSender!=null ) clusterSender.heartbeat();
        super.heartbeat();
    }
    
    /**
     * has members
     */
    public boolean hasMembers() {
        return this.getMembershipService().hasMembers();
    }

    /**
     * Get all current cluster members
     * @return all members or empty array
     */
    public Member[] getMembers() {
        return this.getMembershipService().getMembers();
    }

    /**
     * 
     * @param mbr Member
     * @return Member
     */
    public Member getMember(Member mbr){
        return this.getMembershipService().getMember(mbr);
    }


    /**
     * Return the member that represents this node.
     *
     * @return Member
     */
    public Member getLocalMember(boolean incAlive) {
        return this.getMembershipService().getLocalMember(incAlive);
    }

   
}

Other Tomcat examples (source code examples)

Here is a short list of links related to this Tomcat ChannelCoordinator.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.