| career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (

This example Akka source code file ( is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

abstractprocessor, actor, actorref, akka, boxedunit, concurrent, duration, msgsent, myprocessor, object, override, partialfunction, persistence, receivebuilder, string, time

The Akka example source code

 * Copyright (C) 2009-2014 Typesafe Inc. <>

package doc;

import akka.persistence.*;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;

import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;

public class LambdaPersistenceDocTest {

  public interface SomeOtherMessage {}

  public interface ProcessorMethods {
    public String persistenceId();
    public boolean recoveryRunning();
    public boolean recoveryFinished();
    public Persistent getCurrentPersistentMessage();

  static Object o1 = new Object() {
    class MyProcessor extends AbstractProcessor {
      public MyProcessor() {
          match(Persistent.class, p -> {
            // message successfully written to journal
            Object payload = p.payload();
            Long sequenceNr = p.sequenceNr();
            // ...
          match(PersistenceFailure.class, failure -> {
            // message failed to be written to journal
            Object payload = failure.payload();
            Long sequenceNr = failure.sequenceNr();
            Throwable cause = failure.cause();
            // ...
          match(SomeOtherMessage.class, message -> {
            // message not written to journal

    class MyActor extends AbstractActor {
      ActorRef processor;

      public MyActor() {
        processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor");

        processor.tell(Persistent.create("foo"), null);
        processor.tell("bar", null);

          match(Persistent.class, received -> {/* ... */}).build()

      private void recover() {
        processor.tell(Recover.create(), null);

  static Object o2 = new Object() {
    abstract class MyProcessor1 extends AbstractProcessor {
      public void preStart() {}

      public void preRestart(Throwable reason, Option<Object> message) {}

    abstract class MyProcessor2 extends AbstractProcessor {
      public void preStart() {
        self().tell(Recover.create(457L), null);

    abstract class MyProcessor3 extends AbstractProcessor {
      public void preRestart(Throwable reason, Option<Object> message) {
        if (message.isDefined() && message.get() instanceof Persistent) {
          deleteMessage(((Persistent) message.get()).sequenceNr());
        super.preRestart(reason, message);

    class MyProcessor4 extends AbstractProcessor implements ProcessorMethods {
      public String persistenceId() {
        return "my-stable-persistence-id";

      public MyProcessor4() {
          match(Persistent.class, received -> {/* ... */}).build()

    class MyPersistentActor5 extends AbstractPersistentActor {

      @Override public String persistenceId() { 
        return "my-stable-persistence-id";

      @Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
        return ReceiveBuilder.
          match(RecoveryCompleted.class, r -> {
          match(String.class, this::handleEvent).build();

      @Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
        return ReceiveBuilder.
          match(String.class, s -> s.equals("cmd"),
            s -> persist("evt", this::handleEvent)).build();
      private void recoveryCompleted() {
          // perform init after recovery, before any other messages
          // ...

      private void handleEvent(String event) {
        // update state
        // ...

  static Object atLeastOnceExample = new Object() {
      class Msg implements Serializable {
        public final long deliveryId;
        public final String s;
        public Msg(long deliveryId, String s) {
          this.deliveryId = deliveryId;
          this.s = s;
      class Confirm implements Serializable {
        public final long deliveryId;
        public Confirm(long deliveryId) {
          this.deliveryId = deliveryId;
      class MsgSent implements Serializable {
        public final String s;
        public MsgSent(String s) {
          this.s = s;
      class MsgConfirmed implements Serializable {
        public final long deliveryId;
        public MsgConfirmed(long deliveryId) {
          this.deliveryId = deliveryId;
      class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
        private final ActorPath destination;

        public MyPersistentActor(ActorPath destination) {
            this.destination = destination;

        public PartialFunction<Object, BoxedUnit> receiveCommand() {
          return ReceiveBuilder.
            match(String.class, s -> {
              persist(new MsgSent(s), evt -> updateState(evt));
            match(Confirm.class, confirm -> {
              persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt));

        public PartialFunction<Object, BoxedUnit> receiveRecover() {
          return ReceiveBuilder.
              match(Object.class, evt -> updateState(evt)).build();
        void updateState(Object event) {
          if (event instanceof MsgSent) {
            final MsgSent evt = (MsgSent) event;
            deliver(destination, deliveryId -> new Msg(deliveryId, evt.s));
          } else if (event instanceof MsgConfirmed) {
            final MsgConfirmed evt = (MsgConfirmed) event;

      class MyDestination extends AbstractActor {
        public MyDestination() {
            match(Msg.class, msg -> {
              // ...
              sender().tell(new Confirm(msg.deliveryId), self());

  static Object o3 = new Object() {
    class MyProcessor extends AbstractProcessor {
      private final ActorRef destination;
      private final ActorRef channel;

      public MyProcessor() {
        this.destination = context().actorOf(Props.create(MyDestination.class)); = context().actorOf(Channel.props(), "myChannel");

          match(Persistent.class, p -> {
            Persistent out = p.withPayload("done " + p.payload());
            channel.tell(Deliver.create(out, destination.path()), self());

    class MyDestination extends AbstractActor {
      public MyDestination() {
          match(ConfirmablePersistent.class, p -> {
            Object payload = p.payload();
            Long sequenceNr = p.sequenceNr();
            int redeliveries = p.redeliveries();
            // ...

    class MyProcessor2 extends AbstractProcessor {
      private final ActorRef destination;
      private final ActorRef channel;

      public MyProcessor2(ActorRef destination) {
        this.destination = context().actorOf(Props.create(MyDestination.class));
        //#channel-id-override = context().actorOf(Channel.props("my-stable-channel-id"));

            .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))

        class MyListener extends AbstractActor {
          public MyListener() {
              match(RedeliverFailure.class, r -> {
                Iterable<ConfirmablePersistent> messages = r.getMessages();
                // ...

        final ActorRef myListener = context().actorOf(Props.create(MyListener.class));

          match(Persistent.class, p -> {
            Persistent out = p.withPayload("done " + p.payload());
            channel.tell(Deliver.create(out, destination.path()), self());

            channel.tell(Deliver.create(out, sender().path()), self());

  static Object o4 = new Object() {
    class MyProcessor extends AbstractProcessor {
      private Object state;

      public MyProcessor() {
          match(String.class, s -> s.equals("snap"),
            s -> saveSnapshot(state)).
          match(SaveSnapshotSuccess.class, ss -> {
            SnapshotMetadata metadata = ss.metadata();
            // ...
          match(SaveSnapshotFailure.class, sf -> {
            SnapshotMetadata metadata = sf.metadata();
            // ...

  static Object o5 = new Object() {
    class MyProcessor extends AbstractProcessor {
      private Object state;

      public MyProcessor() {
          match(SnapshotOffer.class, s -> {
            state = s.snapshot();
            // ...
          match(Persistent.class, p -> {/* ...*/}).build()

    class MyActor extends AbstractActor {
      ActorRef processor;

      public MyActor() {
        processor = context().actorOf(Props.create(MyProcessor.class));
          match(Object.class, o -> {/* ... */}).build()

      private void recover() {
            .create(457L, System.currentTimeMillis())), null);

  static Object o6 = new Object() {
    class MyProcessor extends AbstractProcessor {
      public MyProcessor() {
          match(Persistent.class, p -> p.payload().equals("a"),
            p -> {/* ... */}).
          match(Persistent.class, p -> p.payload().equals("b"),
            p -> {/* ... */}).build()

    class Example {
      final ActorSystem system    = ActorSystem.create("example");
      final ActorRef    processor = system.actorOf(Props.create(MyProcessor.class));

      public void batchWrite() {
            Persistent.create("b"))), null);

      // ...

  static Object o7 = new Object() {
    abstract class MyProcessor extends AbstractProcessor {
      ActorRef destination;

      public void foo() {
        final ActorRef channel = context().actorOf(
              .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))

        channel.tell(Deliver.create(Persistent.create("example"), destination.path()), self());

  static Object o8 = new Object() {
    class MyEventsourcedProcessor extends AbstractPersistentActor {
      private ActorRef destination;
      private ActorRef channel;

      public MyEventsourcedProcessor(ActorRef destination) {
        this.destination = destination; = context().actorOf(Channel.props(), "channel");

      @Override public String persistenceId() { 
        return "my-stable-persistence-id";

      private void handleEvent(String event) {
        // update state
        // ...
        // reliably deliver events
          Persistent.create(event, getCurrentPersistentMessage()),
          destination.path()), self());

      @Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
        return ReceiveBuilder.
          match(String.class, this::handleEvent).build();

      @Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
        return ReceiveBuilder.
          match(String.class, s -> s.equals("cmd"),
            s -> persist("evt", this::handleEvent)).build();

  static Object o9 = new Object() {
    class MyPersistentActor extends AbstractPersistentActor {

      @Override public String persistenceId() { 
        return "my-stable-persistence-id";

      private void handleCommand(String c) {
        sender().tell(c, self());

        persistAsync(String.format("evt-%s-1", c), e -> {
          sender().tell(e, self());
        persistAsync(String.format("evt-%s-2", c), e -> {
          sender().tell(e, self());

      @Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
        return ReceiveBuilder.
          match(String.class, this::handleCommand).build();

      @Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
        return ReceiveBuilder.
          match(String.class, this::handleCommand).build();

    public void usage() {
      final ActorSystem system = ActorSystem.create("example");
      final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
      processor.tell("a", null);
      processor.tell("b", null);

      // possible order of received messages:
      // a
      // b
      // evt-a-1
      // evt-a-2
      // evt-b-1
      // evt-b-2

  static Object o10 = new Object() {
    class MyPersistentActor extends AbstractPersistentActor {

      @Override public String persistenceId() { 
        return "my-stable-persistence-id";

      private void handleCommand(String c) {
        persistAsync(String.format("evt-%s-1", c), e -> {
          sender().tell(e, self());
        persistAsync(String.format("evt-%s-2", c), e -> {
          sender().tell(e, self());

        defer(String.format("evt-%s-3", c), e -> {
          sender().tell(e, self());

      @Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
        return ReceiveBuilder.
          match(String.class, this::handleCommand).build();

      @Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
        return ReceiveBuilder.
          match(String.class, this::handleCommand).build();

    public void usage() {
      final ActorSystem system = ActorSystem.create("example");
      final ActorRef sender = null; // your imaginary sender here
      final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
      processor.tell("a", sender);
      processor.tell("b", sender);

      // order of received messages:
      // a
      // b
      // evt-a-1
      // evt-a-2
      // evt-a-3
      // evt-b-1
      // evt-b-2
      // evt-b-3

  static Object o11 = new Object() {
    class MyView extends AbstractPersistentView {
      @Override public String persistenceId() { return "some-persistence-id"; }
      @Override public String viewId() { return "some-persistence-id-view"; }

      public MyView() {
          match(Object.class, p -> isPersistent(),  persistent -> {
            // ...

    public void usage() {
      final ActorSystem system = ActorSystem.create("example");
      final ActorRef view = system.actorOf(Props.create(MyView.class));
      view.tell(Update.create(true), null);

Other Akka source code examples

Here is a short list of links related to this Akka source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller


new blog posts


Copyright 1998-2021 Alvin Alexander,
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.