alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RouterDocTest.java)

This example Akka source code file (RouterDocTest.java) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, address, akka, defaultresizer, finiteduration, javatestkit, roundrobinpool, route, router, routerdoctest, routing, test, testing, untypedactor, work

The RouterDocTest.java Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.jrouting;

import akka.testkit.AkkaJUnitActorSystemResource;

import org.junit.ClassRule;
import org.junit.Test;

import static org.junit.Assert.*;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import com.typesafe.config.ConfigFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import akka.testkit.JavaTestKit;
import akka.actor.ActorSystem;


//#imports1
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.routing.ActorRefRoutee;
import akka.routing.Routee;
import akka.routing.Router;

//#imports1


//#imports2
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.actor.Kill;
import akka.actor.PoisonPill;
import akka.actor.SupervisorStrategy;
import akka.actor.OneForOneStrategy;
import akka.remote.routing.RemoteRouterConfig;
import akka.routing.Broadcast;
import akka.routing.BroadcastGroup;
import akka.routing.BroadcastPool;
import akka.routing.ConsistentHashingGroup;
import akka.routing.ConsistentHashingPool;
import akka.routing.DefaultResizer;
import akka.routing.FromConfig;
import akka.routing.RandomGroup;
import akka.routing.RandomPool;
import akka.routing.RoundRobinGroup;
import akka.routing.RoundRobinPool;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.ScatterGatherFirstCompletedGroup;
import akka.routing.ScatterGatherFirstCompletedPool;
import akka.routing.BalancingPool;
import akka.routing.SmallestMailboxPool;

//#imports2

public class RouterDocTest {

  @ClassRule
  public static AkkaJUnitActorSystemResource actorSystemResource =
    new AkkaJUnitActorSystemResource("RouterDocTest", 
        ConfigFactory.parseString(docs.routing.RouterDocSpec.config()));

  private final ActorSystem system = actorSystemResource.getSystem();


  static
  //#router-in-actor
  public final class Work implements Serializable {
    private static final long serialVersionUID = 1L;
    public final String payload;
    public Work(String payload) {
      this.payload = payload;
    }
  }

  //#router-in-actor
  static
  //#router-in-actor
  public class Master extends UntypedActor {
    
    Router router;
    {
      List<Routee> routees = new ArrayList<Routee>();
      for (int i = 0; i < 5; i++) {
        ActorRef r = getContext().actorOf(Props.create(Worker.class));
        getContext().watch(r);
        routees.add(new ActorRefRoutee(r));
      }
      router = new Router(new RoundRobinRoutingLogic(), routees);
    }

    public void onReceive(Object msg) {
      if (msg instanceof Work) {
        router.route(msg, getSender());
      } else if (msg instanceof Terminated) {
        router = router.removeRoutee(((Terminated) msg).actor());
        ActorRef r = getContext().actorOf(Props.create(Worker.class));
        getContext().watch(r);
        router = router.addRoutee(new ActorRefRoutee(r));
      }
    }
  }

  //#router-in-actor
  
  static public class Worker extends UntypedActor {
    public void onReceive(Object msg) {}
  }
  
  static public class Echo extends UntypedActor {
    public void onReceive(Object msg) {
      getSender().tell(msg, getSelf());
    }
  }
  
  static public class Replier extends UntypedActor {
    public void onReceive(Object msg) {
      //#reply-with-self
      getSender().tell("reply", getSelf());
      //#reply-with-self
      
      //#reply-with-parent
      getSender().tell("reply", getContext().parent());
      //#reply-with-parent
    }
  }
  

  
  static
  //#create-worker-actors
  public class Workers extends UntypedActor {
    @Override public void preStart() {
      getContext().actorOf(Props.create(Worker.class), "w1");
      getContext().actorOf(Props.create(Worker.class), "w2");
      getContext().actorOf(Props.create(Worker.class), "w3");
    }
    // ...
    //#create-worker-actors
    

    public void onReceive(Object msg) {}
  }
  
  static public class Parent extends UntypedActor {

    //#paths
    List<String> paths = Arrays.asList("/user/workers/w1", "/user/workers/w2", 
      "/user/workers/w3");
    //#paths

    //#round-robin-pool-1
    ActorRef router1 =
      getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)), 
        "router1");
    //#round-robin-pool-1

    //#round-robin-pool-2
    ActorRef router2 =
      getContext().actorOf(new RoundRobinPool(5).props(Props.create(Worker.class)), 
        "router2");
    //#round-robin-pool-2

    //#round-robin-group-1
    ActorRef router3 =
      getContext().actorOf(FromConfig.getInstance().props(), "router3");
    //#round-robin-group-1

    //#round-robin-group-2
    ActorRef router4 =
      getContext().actorOf(new RoundRobinGroup(paths).props(), "router4");
    //#round-robin-group-2  

    //#random-pool-1
    ActorRef router5 =
      getContext().actorOf(FromConfig.getInstance().props(
        Props.create(Worker.class)), "router5");
    //#random-pool-1

    //#random-pool-2
    ActorRef router6 =
      getContext().actorOf(new RandomPool(5).props(Props.create(Worker.class)), 
        "router6");
    //#random-pool-2

    //#random-group-1
    ActorRef router7 =
      getContext().actorOf(FromConfig.getInstance().props(), "router7");
    //#random-group-1

    //#random-group-2
    ActorRef router8 =
      getContext().actorOf(new RandomGroup(paths).props(), "router8");
    //#random-group-2
    
    //#balancing-pool-1
    ActorRef router9 =
      getContext().actorOf(FromConfig.getInstance().props(
        Props.create(Worker.class)), "router9");
    //#balancing-pool-1

    //#balancing-pool-2
    ActorRef router10 =
      getContext().actorOf(new BalancingPool(5).props(
        Props.create(Worker.class)), "router10");
    //#balancing-pool-2

    //#smallest-mailbox-pool-1
    ActorRef router11 =
      getContext().actorOf(FromConfig.getInstance().props(
        Props.create(Worker.class)), "router11");
    //#smallest-mailbox-pool-1

    //#smallest-mailbox-pool-2
    ActorRef router12 =
      getContext().actorOf(new SmallestMailboxPool(5).props(
        Props.create(Worker.class)), "router12");
    //#smallest-mailbox-pool-2

    //#broadcast-pool-1
    ActorRef router13 =
      getContext().actorOf(FromConfig.getInstance().props(
        Props.create(Worker.class)), "router13");
    //#broadcast-pool-1

    //#broadcast-pool-2
    ActorRef router14 =
      getContext().actorOf(new BroadcastPool(5).props(Props.create(Worker.class)),
        "router14");
    //#broadcast-pool-2

    //#broadcast-group-1
    ActorRef router15 =
      getContext().actorOf(FromConfig.getInstance().props(), "router15");
    //#broadcast-group-1

    //#broadcast-group-2
    ActorRef router16 =
      getContext().actorOf(new BroadcastGroup(paths).props(), "router16");
    //#broadcast-group-2

    //#scatter-gather-pool-1
    ActorRef router17 =
      getContext().actorOf(FromConfig.getInstance().props(
        Props.create(Worker.class)), "router17");
    //#scatter-gather-pool-1

    //#scatter-gather-pool-2
    FiniteDuration within = FiniteDuration.create(10, TimeUnit.SECONDS); 
    ActorRef router18 =
      getContext().actorOf(new ScatterGatherFirstCompletedPool(5, within).props(
        Props.create(Worker.class)), "router18");
    //#scatter-gather-pool-2

    //#scatter-gather-group-1
    ActorRef router19 =
      getContext().actorOf(FromConfig.getInstance().props(), "router19");
    //#scatter-gather-group-1

    //#scatter-gather-group-2
    FiniteDuration within2 = FiniteDuration.create(10, TimeUnit.SECONDS);
    ActorRef router20 =
      getContext().actorOf(new ScatterGatherFirstCompletedGroup(paths, within2).props(), 
        "router20");
    //#scatter-gather-group-2  

    //#consistent-hashing-pool-1
    ActorRef router21 =
      getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)),
        "router21");
    //#consistent-hashing-pool-1

    //#consistent-hashing-pool-2
    ActorRef router22 =
      getContext().actorOf(new ConsistentHashingPool(5).props(
        Props.create(Worker.class)), "router22");
    //#consistent-hashing-pool-2

    //#consistent-hashing-group-1
    ActorRef router23 =
      getContext().actorOf(FromConfig.getInstance().props(), "router23");
    //#consistent-hashing-group-1

    //#consistent-hashing-group-2
    ActorRef router24 =
      getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router24");
    //#consistent-hashing-group-2  

    //#resize-pool-1
    ActorRef router25 =
      getContext().actorOf(FromConfig.getInstance().props(
        Props.create(Worker.class)), "router25");
    //#resize-pool-1

    //#resize-pool-2
    DefaultResizer resizer = new DefaultResizer(2, 15);
    ActorRef router26 =
      getContext().actorOf(new RoundRobinPool(5).withResizer(resizer).props(
        Props.create(Worker.class)), "router26");
    //#resize-pool-2  
      
    public void onReceive(Object msg) {}
  }



  @Test
  public void createActors() {
    //#create-workers
    system.actorOf(Props.create(Workers.class), "workers");
    //#create-workers
    
    //#create-parent
    system.actorOf(Props.create(Parent.class), "parent");
    //#create-parent
  }
  
  @Test
  public void demonstrateDispatcher() {
    //#dispatchers
    Props props = 
      // “head” router actor will run on "router-dispatcher" dispatcher
      // Worker routees will run on "pool-dispatcher" dispatcher  
      new RandomPool(5).withDispatcher("router-dispatcher").props(
        Props.create(Worker.class));
    ActorRef router = system.actorOf(props, "poolWithDispatcher");
    //#dispatchers
  }
  
  @Test
  public void demonstrateBroadcast() {
    new JavaTestKit(system) {{
      ActorRef router = system.actorOf(new RoundRobinPool(5).props(
        Props.create(Echo.class)));
      //#broadcastDavyJonesWarning
      router.tell(new Broadcast("Watch out for Davy Jones' locker"), getTestActor());
      //#broadcastDavyJonesWarning
      assertEquals(5, receiveN(5).length);
    }};
  }
  
  @Test
  public void demonstratePoisonPill() {
    new JavaTestKit(system) {{
      ActorRef router = watch(system.actorOf(new RoundRobinPool(5).props(
        Props.create(Echo.class))));
      //#poisonPill
      router.tell(PoisonPill.getInstance(), getTestActor());
      //#poisonPill
      expectTerminated(router);
    }};
  }
  
  @Test
  public void demonstrateBroadcastPoisonPill() {
    new JavaTestKit(system) {{
      ActorRef router = watch(system.actorOf(new RoundRobinPool(5).props(
        Props.create(Echo.class))));
      //#broadcastPoisonPill
      router.tell(new Broadcast(PoisonPill.getInstance()), getTestActor());
      //#broadcastPoisonPill
      expectTerminated(router);
    }};
  }
  
  @Test
  public void demonstrateKill() {
    new JavaTestKit(system) {{
      ActorRef router = watch(system.actorOf(new RoundRobinPool(5).props(
        Props.create(Echo.class))));
      //#kill
      router.tell(Kill.getInstance(), getTestActor());
      //#kill
      expectTerminated(router);
    }};
  }
  
  @Test
  public void demonstrateBroadcastKill() {
    new JavaTestKit(system) {{
      ActorRef router = watch(system.actorOf(new RoundRobinPool(5).props(
        Props.create(Echo.class))));
      //#broadcastKill
      router.tell(new Broadcast(Kill.getInstance()), getTestActor());
      //#broadcastKill
      expectTerminated(router);
    }};
  }

  @Test
  public void demonstrateRemoteDeploy() {
    //#remoteRoutees
    Address[] addresses = {
      new Address("akka.tcp", "remotesys", "otherhost", 1234),
      AddressFromURIString.parse("akka.tcp://othersys@anotherhost:1234")};
    ActorRef routerRemote = system.actorOf(
      new RemoteRouterConfig(new RoundRobinPool(5), addresses).props(
        Props.create(Echo.class)));
    //#remoteRoutees
  }
  
  @Test
  public void demonstrateSupervisor() {
    //#supervision
    final SupervisorStrategy strategy =
      new OneForOneStrategy(5, Duration.create(1, TimeUnit.MINUTES),
        Collections.<Class<? extends Throwable>>singletonList(Exception.class));
    final ActorRef router = system.actorOf(new RoundRobinPool(5).
      withSupervisorStrategy(strategy).props(Props.create(Echo.class)));
    //#supervision
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka RouterDocTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.