alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ActorDocTest.java)

This example Akka source code file (ActorDocTest.java) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

abstractactor, actor, actorref, actorwithargs, akka, boxedunit, concurrent, event, hi, i, javatestkit, partialfunction, props, test, testing, testkit

The ActorDocTest.java Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package docs.actor;

import akka.actor.*;
import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actor.Messages.Swap.Swap;
import static docs.actor.Messages.*;
import static akka.japi.Util.immutableSeq;

import java.util.concurrent.TimeUnit;

import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;

//#import-props
import akka.actor.Props;
//#import-props
//#import-actorRef
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
//#import-actorRef
//#import-identify
import akka.actor.ActorIdentity;
import akka.actor.ActorSelection;
import akka.actor.Identify;
//#import-identify
//#import-graceFulStop
import akka.pattern.AskTimeoutException;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-graceFulStop

public class ActorDocTest {

  public static Config config = ConfigFactory.parseString(
    "akka {\n" +
      "  loggers = [\"akka.testkit.TestEventListener\"]\n" +
      "  loglevel = \"WARNING\"\n" +
      "  stdout-loglevel = \"WARNING\"\n" +
      "}\n"
  );

  static ActorSystem system = null;

  @BeforeClass
  public static void beforeClass() {
    system = ActorSystem.create("ActorDocTest", config);
  }

  @AfterClass
  public static void afterClass() {
    system.shutdown();
    system.awaitTermination(Duration.create("5 seconds"));
  }

  static
  //#context-actorOf
  public class FirstActor extends AbstractActor {
    final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild");
    //#plus-some-behavior
    public FirstActor() {
      receive(ReceiveBuilder.
        matchAny(x -> {
          sender().tell(x, self());
        }).build()
      );
    }
    //#plus-some-behavior
  }
  //#context-actorOf

  static public abstract class SomeActor extends AbstractActor {
    //#receive-constructor
    public SomeActor() {
      receive(ReceiveBuilder.
        //#and-some-behavior
        match(String.class, s -> System.out.println(s.toLowerCase())).
        //#and-some-behavior
      build());
    }
    //#receive-constructor
    @Override
    //#receive
    public abstract PartialFunction<Object, BoxedUnit> receive();
    //#receive
  }

  static public class ActorWithArgs extends AbstractActor {
    private final String args;

    ActorWithArgs(String args) {
      this.args = args;
      receive(ReceiveBuilder.
        matchAny(x -> { }).build()
      );
    }
  }

  static
  //#props-factory
  public class DemoActor extends AbstractActor {
    /**
     * Create Props for an actor of this type.
     * @param magicNumber The magic number to be passed to this actor’s constructor.
     * @return a Props for creating this actor, which can then be further configured
     *         (e.g. calling `.withDispatcher()` on it)
     */
    static Props props(Integer magicNumber) {
      // You need to specify the actual type of the returned actor
      // since Java 8 lambdas have some runtime type information erased
      return Props.create(DemoActor.class, () -> new DemoActor(magicNumber));
    }

    private final Integer magicNumber;

    DemoActor(Integer magicNumber) {
      this.magicNumber = magicNumber;
      receive(ReceiveBuilder.
        match(Integer.class, i -> {
          sender().tell(i + magicNumber, self());
        }).build()
      );
    }
  }

  //#props-factory
  static
  //#props-factory
  public class SomeOtherActor extends AbstractActor {
    // Props(new DemoActor(42)) would not be safe
    ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo");
    // ...
    //#props-factory
    public SomeOtherActor() {
      receive(emptyBehavior());
    }
    //#props-factory
  }
  //#props-factory

  public static class Hook extends AbstractActor {
    ActorRef target = null;
    public Hook() {
      receive(emptyBehavior());
    }
    //#preStart
    @Override
    public void preStart() {
      target = context().actorOf(Props.create(MyActor.class, "target"));
    }
    //#preStart
    //#postStop
    @Override
    public void postStop() {
      //#clean-up-some-resources
      final String message = "stopped";
      //#tell
      // don’t forget to think about who is the sender (2nd argument)
      target.tell(message, self());
      //#tell
      final Object result = "";
      //#forward
      target.forward(result, context());
      //#forward
      target = null;
      //#clean-up-some-resources
    }
    //#postStop

    // compilation test only
    public void compileSelections() {
      //#selection-local
      // will look up this absolute path
      context().actorSelection("/user/serviceA/actor");
      // will look up sibling beneath same supervisor
      context().actorSelection("../joe");
      //#selection-local

      //#selection-wildcard
      // will look all children to serviceB with names starting with worker
      context().actorSelection("/user/serviceB/worker*");
      // will look up all siblings beneath same supervisor
      context().actorSelection("../*");
      //#selection-wildcard

      //#selection-remote
      context().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");
      //#selection-remote
    }
  }

  public static class ReplyException extends AbstractActor {
    public ReplyException() {
      receive(ReceiveBuilder.
        matchAny(x -> {
          //#reply-exception
          try {
            String result = operation();
            sender().tell(result, self());
          } catch (Exception e) {
            sender().tell(new akka.actor.Status.Failure(e), self());
            throw e;
          }
          //#reply-exception
        }).build()
      );
    }

    private String operation() {
      return "Hi";
    }
  }

  static
  //#gracefulStop-actor
  public class Manager extends AbstractActor {
    private static enum Shutdown {
      Shutdown
    }
    public static final Shutdown SHUTDOWN = Shutdown.Shutdown;

    private ActorRef worker =
    context().watch(context().actorOf(Props.create(Cruncher.class), "worker"));

    public Manager() {
      receive(ReceiveBuilder.
        matchEquals("job", s -> {
          worker.tell("crunch", self());
        }).
        matchEquals(SHUTDOWN, x -> {
          worker.tell(PoisonPill.getInstance(), self());
          context().become(shuttingDown);
        }).build()
      );
    }

    public PartialFunction<Object, BoxedUnit> shuttingDown =
      ReceiveBuilder.
        matchEquals("job", s -> {
          sender().tell("service unavailable, shutting down", self());
        }).
        match(Terminated.class, t -> t.actor().equals(worker), t -> {
          context().stop(self());
        }).build();
  }
  //#gracefulStop-actor

  @Test
  public void usePatternsGracefulStop() throws Exception {
    ActorRef actorRef = system.actorOf(Props.create(Manager.class));
    //#gracefulStop
    try {
      Future<Boolean> stopped =
        gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
      Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
      // the actor has been stopped
    } catch (AskTimeoutException e) {
      // the actor wasn't stopped within 5 seconds
    }
    //#gracefulStop
  }


  public static class Cruncher extends AbstractActor {
    public Cruncher() {
      receive(ReceiveBuilder.
        matchEquals("crunch", s -> { }).build()
      );
    }
  }

  static
  //#swapper
  public class Swapper extends AbstractLoggingActor {
    public Swapper() {
      receive(ReceiveBuilder.
        matchEquals(Swap, s -> {
            log().info("Hi");
            context().become(ReceiveBuilder.
                    matchEquals(Swap, x -> {
                      log().info("Ho");
                      context().unbecome(); // resets the latest 'become' (just for fun)
                    }).build(), false); // push on top instead of replace
        }).build()
      );
    }
  }

  //#swapper
  static
  //#swapper
  public class SwapperApp {
    public static void main(String[] args) {
      ActorSystem system = ActorSystem.create("SwapperSystem");
      ActorRef swapper = system.actorOf(Props.create(Swapper.class), "swapper");
      swapper.tell(Swap, ActorRef.noSender()); // logs Hi
      swapper.tell(Swap, ActorRef.noSender()); // logs Ho
      swapper.tell(Swap, ActorRef.noSender()); // logs Hi
      swapper.tell(Swap, ActorRef.noSender()); // logs Ho
      swapper.tell(Swap, ActorRef.noSender()); // logs Hi
      swapper.tell(Swap, ActorRef.noSender()); // logs Ho
      system.shutdown();
    }
  }
  //#swapper


  @Test
  public void creatingActorWithSystemActorOf() {
    //#system-actorOf
    // ActorSystem is a heavy object: create only one per application
    final ActorSystem system = ActorSystem.create("MySystem", config);
    final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
    //#system-actorOf
    try {
      new JavaTestKit(system) {
        {
          myActor.tell("hello", getRef());
          expectMsgEquals("hello");
        }
      };
    } finally {
      JavaTestKit.shutdownActorSystem(system);
    }
  }

  @Test
  public void creatingPropsConfig() {
    //#creating-props
    Props props1 = Props.create(MyActor.class);
    Props props2 = Props.create(ActorWithArgs.class,
      () -> new ActorWithArgs("arg")); // careful, see below
    Props props3 = Props.create(ActorWithArgs.class, "arg");
    //#creating-props

    //#creating-props-deprecated
    // NOT RECOMMENDED within another actor:
    // encourages to close over enclosing class
    Props props7 = Props.create(ActorWithArgs.class,
      () -> new ActorWithArgs("arg"));
    //#creating-props-deprecated
  }

  @Test(expected=IllegalArgumentException.class)
  public void creatingPropsIllegal() {
    //#creating-props-illegal
    // This will throw an IllegalArgumentException since some runtime
    // type information of the lambda is erased.
    // Use Props.create(actorClass, Creator) instead.
    Props props = Props.create(() -> new ActorWithArgs("arg"));
    //#creating-props-illegal
  }

  static
  //#receive-timeout
  public class ReceiveTimeoutActor extends AbstractActor {
    //#receive-timeout
    ActorRef target = context().system().deadLetters();
    //#receive-timeout
    public ReceiveTimeoutActor() {
      // To set an initial delay
      context().setReceiveTimeout(Duration.create("10 seconds"));

      receive(ReceiveBuilder.
        matchEquals("Hello", s -> {
          // To set in a response to a message
          context().setReceiveTimeout(Duration.create("1 second"));
          //#receive-timeout
          target = sender();
          target.tell("Hello world", self());
          //#receive-timeout
        }).
        match(ReceiveTimeout.class, r -> {
          // To turn it off
          context().setReceiveTimeout(Duration.Undefined());
          //#receive-timeout
          target.tell("timeout", self());
          //#receive-timeout
        }).build()
      );
    }
  }
  //#receive-timeout

  @Test
  public void using_receiveTimeout() {
    final ActorRef myActor = system.actorOf(Props.create(ReceiveTimeoutActor.class));
    new JavaTestKit(system) {
      {
        myActor.tell("Hello", getRef());
        expectMsgEquals("Hello world");
        expectMsgEquals("timeout");
      }
    };
  }

  static
  //#hot-swap-actor
  public class HotSwapActor extends AbstractActor {
    private PartialFunction<Object, BoxedUnit> angry;
    private PartialFunction<Object, BoxedUnit> happy;

    public HotSwapActor() {
      angry =
        ReceiveBuilder.
          matchEquals("foo", s -> {
            sender().tell("I am already angry?", self());
          }).
          matchEquals("bar", s -> {
            context().become(happy);
          }).build();

      happy = ReceiveBuilder.
        matchEquals("bar", s -> {
          sender().tell("I am already happy :-)", self());
        }).
        matchEquals("foo", s -> {
          context().become(angry);
        }).build();

      receive(ReceiveBuilder.
        matchEquals("foo", s -> {
          context().become(angry);
        }).
        matchEquals("bar", s -> {
          context().become(happy);
        }).build()
      );
    }
  }
  //#hot-swap-actor

  @Test
  public void using_hot_swap() {
    final ActorRef actor = system.actorOf(Props.create(HotSwapActor.class), "hot");
    new JavaTestKit(system) {
      {
        actor.tell("foo", getRef());
        actor.tell("foo", getRef());
        expectMsgEquals("I am already angry?");
        actor.tell("bar", getRef());
        actor.tell("bar", getRef());
        expectMsgEquals("I am already happy :-)");
        actor.tell("foo", getRef());
        actor.tell("foo", getRef());
        expectMsgEquals("I am already angry?");
        expectNoMsg(Duration.create(1, TimeUnit.SECONDS));
      }
    };
  }


  static
  //#stash
  public class ActorWithProtocol extends AbstractActorWithStash {
    public ActorWithProtocol() {
      receive(ReceiveBuilder.
        matchEquals("open", s -> {
          context().become(ReceiveBuilder.
            matchEquals("write", ws -> { /* do writing */ }).
            matchEquals("close", cs -> {
              unstashAll();
              context().unbecome();
            }).
            matchAny(msg -> stash()).build(), false);
        }).
        matchAny(msg -> stash()).build()
      );
    }
  }
  //#stash

  @Test
  public void using_Stash() {
    final ActorRef actor = system.actorOf(Props.create(ActorWithProtocol.class), "stash");
  }

  static
  //#watch
  public class WatchActor extends AbstractActor {
    private final ActorRef child = context().actorOf(Props.empty(), "target");
    private ActorRef lastSender = system.deadLetters();

    public WatchActor() {
      context().watch(child); // <-- this is the only call needed for registration

      receive(ReceiveBuilder.
        matchEquals("kill", s -> {
          context().stop(child);
          lastSender = sender();
        }).
        match(Terminated.class, t -> t.actor().equals(child), t -> {
          lastSender.tell("finished", self());
        }).build()
      );
    }
  }
  //#watch

  @Test
  public void using_watch() {
    ActorRef actor = system.actorOf(Props.create(WatchActor.class));

    new JavaTestKit(system) {
      {
        actor.tell("kill", getRef());
        expectMsgEquals("finished");
      }
    };
  }

  static
  //#identify
  public class Follower extends AbstractActor {
    final Integer identifyId = 1;

    public Follower(){
      ActorSelection selection = context().actorSelection("/user/another");
      selection.tell(new Identify(identifyId), self());

      receive(ReceiveBuilder.
        match(ActorIdentity.class, id -> id.getRef() != null, id -> {
          ActorRef ref = id.getRef();
          context().watch(ref);
          context().become(active(ref));
        }).
        match(ActorIdentity.class, id -> id.getRef() == null, id -> {
          context().stop(self());
        }).build()
      );
    }

    final PartialFunction<Object, BoxedUnit> active(final ActorRef another) {
      return ReceiveBuilder.
        match(Terminated.class, t -> t.actor().equals(another), t -> {
          context().stop(self());
        }).build();
    }
  }
  //#identify

  @Test
  public void using_Identify() {
    ActorRef a = system.actorOf(Props.empty());
    ActorRef b = system.actorOf(Props.create(Follower.class));

    new JavaTestKit(system) {
      {
        watch(b);
        system.stop(a);
        assertEquals(expectMsgClass(Duration.create(2, TimeUnit.SECONDS), Terminated.class).actor(), b);
      }
    };
  }

  public static class NoReceiveActor extends AbstractActor {
  }

  @Test
  public void noReceiveActor() {
    EventFilter ex1 = new ErrorFilter(ActorInitializationException.class);
    EventFilter[] ignoreExceptions = { ex1 };
    try {
      system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
      new JavaTestKit(system) {{
        final ActorRef victim = new EventFilter<ActorRef>(ActorInitializationException.class) {
          protected ActorRef run() {
            return system.actorOf(Props.create(NoReceiveActor.class), "victim");
          }
        }.message("Actor behavior has not been set with receive(...)").occurrences(1).exec();

        assertEquals(true, victim.isTerminated());
      }};
    } finally {
      system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
    }
  }

  public static class MultipleReceiveActor extends AbstractActor {
    public MultipleReceiveActor() {
      receive(ReceiveBuilder.
          match(String.class, s1 -> s1.toLowerCase().equals("become"), s1 -> {
            sender().tell(s1.toUpperCase(), self());
            receive(ReceiveBuilder.
              match(String.class, s2 -> {
                sender().tell(s2.toLowerCase(), self());
              }).build()
            );
          }).
          match(String.class, s1 -> {
            sender().tell(s1.toUpperCase(), self());
          }).build()
      );
    }
  }

  @Test
  public void multipleReceiveActor() {
    EventFilter ex1 = new ErrorFilter(IllegalActorStateException.class);
    EventFilter[] ignoreExceptions = { ex1 };
    try {
      system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
      new JavaTestKit(system) {{
        new EventFilter<Boolean>(IllegalActorStateException.class) {
          protected Boolean run() {
            ActorRef victim = system.actorOf(Props.create(MultipleReceiveActor.class), "victim2");
            victim.tell("Foo", getRef());
            expectMsgEquals("FOO");
            victim.tell("bEcoMe", getRef());
            expectMsgEquals("BECOME");
            victim.tell("Foo", getRef());
            // if it's upper case, then the actor was restarted
            expectMsgEquals("FOO");
            return true;
          }
        }.message("Actor behavior has already been set with receive(...), " +
                "use context().become(...) to change it later").occurrences(1).exec();
      }};
    } finally {
      system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
    }
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka ActorDocTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.