alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (WorkerActor.java)

This example Java source code file (WorkerActor.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

ack, actorref, atomicboolean, clearing, clearworker, configuration, directive, exception, job, override, put, runtimeexception, string, threading, threads, util, workeractor

The WorkerActor.java Java example source code

/*
 *
 *  * Copyright 2015 Skymind,Inc.
 *  *
 *  *    Licensed under the Apache License, Version 2.0 (the "License");
 *  *    you may not use this file except in compliance with the License.
 *  *    You may obtain a copy of the License at
 *  *
 *  *        http://www.apache.org/licenses/LICENSE-2.0
 *  *
 *  *    Unless required by applicable law or agreed to in writing, software
 *  *    distributed under the License is distributed on an "AS IS" BASIS,
 *  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  *    See the License for the specific language governing permissions and
 *  *    limitations under the License.
 *
 */

package org.deeplearning4j.scaleout.actor.core.actor;

import akka.actor.*;
import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.contrib.pattern.DistributedPubSubMediator.Put;
import akka.japi.Function;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import org.canova.api.conf.Configuration;
import org.deeplearning4j.scaleout.actor.core.protocol.Ack;
import org.deeplearning4j.scaleout.actor.core.protocol.ClearWorker;
import org.deeplearning4j.scaleout.actor.core.ClusterListener;
import org.deeplearning4j.nn.conf.DeepLearningConfigurable;

import org.deeplearning4j.scaleout.job.Job;
import org.deeplearning4j.scaleout.perform.WorkerPerformer;
import org.deeplearning4j.scaleout.api.statetracker.StateTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Iterative reduce actor for handling batch sizes
 * @author Adam Gibson
 *
 */
public class WorkerActor extends  UntypedActor implements DeepLearningConfigurable {



    protected Job currentJob;
    protected String id;
    Cluster cluster = Cluster.get(getContext().system());
    protected ActorRef clusterClient;
    protected String masterPath;
    protected StateTracker tracker;
    protected AtomicBoolean isWorking = new AtomicBoolean(false);
    protected Configuration conf;
    protected ActorRef mediator = DistributedPubSubExtension.get(getContext().system()).mediator();
    protected Cancellable heartbeat;
    protected static final Logger log = LoggerFactory.getLogger(WorkerActor.class);
    protected WorkerPerformer workerPerformer;


    public WorkerActor(Configuration conf,StateTracker tracker,WorkerPerformer workerPerformer)throws Exception   {
        this(conf,null,tracker,workerPerformer);
    }


    public WorkerActor(Configuration conf,ActorRef client,StateTracker tracker,WorkerPerformer workerPerformer) throws Exception {
        this.tracker = tracker;
        this.workerPerformer = workerPerformer;
        //subscribe to broadcasts from workers (location agnostic)
        mediator.tell(new Put(getSelf()), getSelf());

        //subscribe to broadcasts from master (location agnostic)
        mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.BROADCAST, getSelf()), getSelf());
        //subscribe to shutdown messages
        mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.SHUTDOWN, getSelf()), getSelf());
        id = generateId();
        //replicate the network
        mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER,
                register()), getSelf());

        this.clusterClient = client;

        //ensure worker is available to tracker
        tracker.availableForWork(id);
        //master lookup
        masterPath = conf.get(MASTER_PATH,"");
        log.info("Registered with master " + id + " at master " + conf.get(MASTER_PATH));

        heartbeat();

        tracker.addWorker(id);


        setup(conf);
        //subscribe to broadcasts from workers (location agnostic)
        mediator.tell(new Put(getSelf()), getSelf());

        //subscribe to broadcasts from master (location agnostic)
        mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.BROADCAST, getSelf()), getSelf());


        //subscribe to broadcasts from master (location agnostic)
        mediator.tell(new DistributedPubSubMediator.Subscribe(id, getSelf()), getSelf());

        heartbeat();

        tracker.addWorker(id);

    }





    public static Props propsFor(Configuration conf,StateTracker tracker,WorkerPerformer performer) {
        return Props.create(WorkerActor.class,conf,tracker,performer);
    }







    @SuppressWarnings("unchecked")
    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof DistributedPubSubMediator.SubscribeAck || message instanceof DistributedPubSubMediator.UnsubscribeAck) {
            DistributedPubSubMediator.SubscribeAck ack = (DistributedPubSubMediator.SubscribeAck) message;
            //reply
            mediator.tell(new DistributedPubSubMediator.Publish(ClusterListener.TOPICS,
                    message), getSelf());

            log.info("Subscribed to " + ack.toString());
        }


        else if(message instanceof Ack) {
            log.info("Ack from master on worker " + id);
        }


        else
            unhandled(message);
    }


    @Override
    public void aroundPostStop() {
        super.aroundPostStop();
        //replicate the network
        mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER,
                new ClearWorker(id)), getSelf());
        heartbeat.cancel();
    }



    protected void heartbeat() throws Exception {
        heartbeat = context().system().scheduler().schedule(Duration.apply(1, TimeUnit.SECONDS), Duration.apply(1, TimeUnit.SECONDS), new Runnable() {

            @Override
            public void run() {
                if(!tracker.isDone())
                    tracker.addWorker(id);



                //eventually consistent storage
                try {
                    checkJobAvailable();


                    if(getCurrentJob() != null) {
                        Job job = getCurrentJob();
                        if(job.getWork() == null) {
                            tracker.clearJob(id);
                            tracker.enableWorker(id);
                            log.warn("Work for worker " + id + " was null");
                            return;
                        }



                        String id = job.workerId();
                        if(id == null || id.isEmpty())
                            job.setWorkerId(id);

                        log.info("Confirmation from " + job.workerId() + " on work");
                        long start = System.currentTimeMillis();
                        workerPerformer.perform(job);
                        long end = System.currentTimeMillis();
                        long diff = Math.abs(end - start);
                        log.info("Job took " + diff + " milliseconds");
                        tracker.addUpdate(id, job);
                        tracker.clearJob(id);
                        setCurrentJob(null);

                    }

                    else if(getCurrentJob() == null || !isWorking.get() && tracker.jobFor(id) != null) {
                        if(tracker.jobFor(id) != null) {
                            tracker.clearJob(id);
                            log.info("Clearing stale job... " + id);
                        }

                    }


                }

                catch(HazelcastInstanceNotActiveException e1) {
                    log.warn("Hazel cast shut down...exiting");
                }

                catch(Exception e) {
                    throw new RuntimeException(e);
                }



            }

        }, context().dispatcher());

    }



    public synchronized void setCurrentJob(Job j) {
        this.currentJob = j;
    }

    public synchronized  Job getCurrentJob() {
        return currentJob;
    }



    /**
     * Returns a worker state with the id generated by this worker
     * @return a worker state with the id of this worker
     */
    public WorkerState register() {
        return new WorkerState(this.id);
    }

    /**
     * Generates an id for this worker
     * @return a UUID for this worker
     */
    public String generateId() {
        String base = UUID.randomUUID().toString();
        String host = System.getProperty("akka.remote.netty.tcp.hostname","localhost");
        return host + "-" + base;
    }


    @Override
    public void postStop() throws Exception {
        super.postStop();
        try {
            tracker.removeWorker(id);

        }catch(Exception e) {
            log.info("Tracker already shut down");
        }
        log.info("Post stop on worker actor");
        cluster.unsubscribe(getSelf());
    }

    @Override
    public void preStart() throws Exception {
        super.preStart();
        cluster.subscribe(getSelf(), MemberEvent.class);
        log.info("Pre start on worker");

    }

    protected void checkJobAvailable() throws Exception {
        Job j;

        if((j = tracker.jobFor(id)) == null) {
            //inconsistent state
            if(!isWorking.get() && j != null)  {
                tracker.clearJob(id);
                log.info("Clearing stale job " + id);
            }

            return;
        }

        if(tracker.needsReplicate(id)) {
            try {
                log.info("Updating worker " + id);
                setCurrentJob((Job) tracker.getCurrent());
                tracker.doneReplicating(id);
            }catch(Exception e) {
                throw new RuntimeException(e);
            }
        }


        if(j != null && getCurrentJob() == null) {
            log.info("Assigning job for worker " + id);
            setCurrentJob(j);


        }

    }



    @Override
    public void setup(Configuration conf) {
        this.conf = conf;
        String url = conf.get(MASTER_URL);
        if(url != null) {
            this.masterPath = conf.get(MASTER_PATH);
            Address a = AddressFromURIString.apply(url);
            Cluster.get(context().system()).join(a);

            mediator = DistributedPubSubExtension.get(getContext().system()).mediator();
        }



    }



    @Override
    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0,
                Duration.Zero(),
                new Function<Throwable, Directive>() {
                    public Directive apply(Throwable cause) {
                        log.error("Problem with processing",cause);
                        mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER,
                                new ClearWorker(id)), getSelf());


                        return SupervisorStrategy.restart();
                    }
                }
        );
    }










}

Other Java examples (source code examples)

Here is a short list of links related to this Java WorkerActor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.