alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (BatchActor.java)

This example Java source code file (BatchActor.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

batchactor, configuration, deeplearningconfigurable, exception, jobiterator, logger, number, override, queue, resetmessage, saving, string, susbcribed, threading, threads, util, workrouter

The BatchActor.java Java example source code

/*
 *
 *  * Copyright 2015 Skymind,Inc.
 *  *
 *  *    Licensed under the Apache License, Version 2.0 (the "License");
 *  *    you may not use this file except in compliance with the License.
 *  *    You may obtain a copy of the License at
 *  *
 *  *        http://www.apache.org/licenses/LICENSE-2.0
 *  *
 *  *    Unless required by applicable law or agreed to in writing, software
 *  *    distributed under the License is distributed on an "AS IS" BASIS,
 *  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  *    See the License for the specific language governing permissions and
 *  *    limitations under the License.
 *
 */

package org.deeplearning4j.scaleout.actor.core.actor;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;


import org.canova.api.conf.Configuration;
import org.deeplearning4j.scaleout.actor.core.ClusterListener;
import org.deeplearning4j.scaleout.actor.core.protocol.ResetMessage;
import org.deeplearning4j.scaleout.api.workrouter.WorkRouter;
import org.deeplearning4j.nn.conf.DeepLearningConfigurable;
import org.deeplearning4j.scaleout.job.Job;
import org.deeplearning4j.scaleout.job.JobIterator;
import org.deeplearning4j.scaleout.messages.DoneMessage;
import org.deeplearning4j.scaleout.messages.MoreWorkMessage;
import org.deeplearning4j.scaleout.api.statetracker.StateTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

/**
 * Handles the data  iterator.
 * This includes disseminating new data sets to the cluster.
 * @author Adam Gibson
 *
 */
public class BatchActor extends UntypedActor implements DeepLearningConfigurable {

    protected JobIterator iter;
    private final ActorRef mediator = DistributedPubSubExtension.get(getContext().system()).mediator();
    private static final Logger log = LoggerFactory.getLogger(BatchActor.class);
    public final static String BATCH = "batch";
    private transient StateTracker stateTracker;
    private transient Configuration conf;
    private Queue<String> workers = new ConcurrentLinkedDeque<>();
    private int numDataSets = 0;
    private WorkRouter workRouter;


    public BatchActor(JobIterator iter,StateTracker stateTracker,Configuration conf,WorkRouter workRouter) {
        this.iter = iter;
        this.stateTracker = stateTracker;
        this.conf = conf;
        //subscribe to shutdown messages
        mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.SHUTDOWN, getSelf()), getSelf());
        mediator.tell(new DistributedPubSubMediator.Subscribe(BATCH, getSelf()), getSelf());
        this.workRouter = workRouter;
    }


    @Override
    public void onReceive(Object message) throws Exception {
        if(message instanceof DistributedPubSubMediator.SubscribeAck || message instanceof DistributedPubSubMediator.UnsubscribeAck) {
            log.info("Susbcribed batch actor");
            mediator.tell(new DistributedPubSubMediator.Publish(ClusterListener.TOPICS,
                    message), getSelf());
        }
        else if(message instanceof ResetMessage) {
            iter.reset();
            self().tell(MoreWorkMessage.getInstance(),self());
        }


        else if(message instanceof MoreWorkMessage) {
            log.info("Saving model");
            mediator.tell(new DistributedPubSubMediator.Publish(ModelSavingActor.SAVE,
                    MoreWorkMessage.getInstance()), mediator);

            if(iter.hasNext()) {
                log.info("Propagating new work to master");
                numDataSets++;
                log.info("Iterating over next dataset " + numDataSets);
                List<String> workers2 = stateTracker.workers();
                for(String s : workers2)
                    log.info("Worker " + s);

				/*
				 * Ideal number is target mini batch size per worker.
				 * 
				 * 
				 */

                for(String s : stateTracker.workerData()) {
                    stateTracker.removeWorkerData(s);
                }


                int numWorkers = workers2.size();
                int miniBatchSize = stateTracker.inputSplit();

                if(numWorkers == 0)
                    numWorkers = Runtime.getRuntime().availableProcessors();

                log.info("Number of workers " + numWorkers + " and batch size is " + miniBatchSize);

                //enable workers for sending out data
                for(String worker : stateTracker.workers())
                    stateTracker.enableWorker(worker);

                //fetch specified batch
                int batch = numWorkers * miniBatchSize;
                log.info("Batch size for worker is " + batch);


                //partition the data and save it for access later.
                //Avoid loading it in to memory all at once.
                for(int i = 0; i < numWorkers; i++) {
                    if(!iter.hasNext())
                        break;
                    String worker = nextWorker();
                    log.info("Saving data for worker " + worker);
                    if(worker == null) {
                        i--;
                        continue;
                    }
                    Job next = iter.next(worker);
                    if(next == null)
                        break;

                    workRouter.routeJob(next);

                }

                stateTracker.incrementBatchesRan(workers2.size());

                mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER,
                        stateTracker.workerData()), mediator);
            }

            else if(!iter.hasNext())
                mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER,
                        DoneMessage.getInstance()), mediator);



            else
                unhandled(message);
        }
    }


    private String nextWorker() {
        while(workers.isEmpty()) {
            //always update
            for(String s : stateTracker.workers()) {
                if(stateTracker.jobFor(s) == null) {
                    if(!workers.contains(s))
                        workers.add(s);

                }
            }

            log.info("Refilling queue with size of " + workers.size() + " out of " + stateTracker.numWorkers());
        }

        return workers.poll();

    }



    public JobIterator getIter() {
        return iter;
    }

    @Override
    public void setup(Configuration conf) {

    }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java BatchActor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.