Scala: Akka Typed tutorial: Finding Actors with the Receptionist

Scala/Akka problem: In some situations you can’t pass an ActorRef to another actor, so you want to see how to “look up” an Akka Typed actor so you can send a message to it.

Solution

There are at least two ways to find other Akka Typed actors, and both involve using the Akka Receptionist. The example shown here in the solution shows how to find an actor asynchronously, and the example shown in the Discussion shows how to use the ask method to find another actor synchronously.

In both examples I’ll use the idea of creating an actor system that works like Amazon Echo devices. The basic idea is that a device like this has ears to listen to you, a mouth to speak to you, and a brain to do all of its work. In these examples the Brain actor will need to discover the Mouth actor.

Imports

The following import statements are required by the Mouth and Brain actors that follow. The Receptionist and ServiceKey have not been used until this recipe:

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior,ActorContext,Behaviors}
import akka.actor.typed.receptionist.{Receptionist,ServiceKey}

Mouth (an actor that wants to be found)

The first step in the process is to create a Mouth actor that can be discovered. For an actor to be discoverable, it must register itself with the Akka Receptionist. The Mouth actor shows the two steps necessary to register an actor with the Receptionist:

  1. Create a unique ServiceKey

  2. Send a Receptionist.Register message to the Receptionist, using the ServiceKey

As usual, I comments to the code to explain the key parts:

object Mouth {

    sealed trait MessageToMouth
    case class SpeakText(msg: String) extends MessageToMouth
    private case class ListingResponse(listing: Receptionist.Listing)
        extends MessageToMouth

    // (1) a ServiceKey is a unique identifier for this actor
    val MouthKey: ServiceKey[MessageToMouth] = ServiceKey("Mouth")

    // this line of code is long, so i wrapped it onto two lines
    def apply(): Behavior[MessageToMouth] = Behaviors.setup {
        context: ActorContext[MessageToMouth] =>

        // (2) every actor that wants to be discoverable must register itself
        // with the Receptionist by sending the Receptionist as Receptionist
        // message, including your ServiceKey
        context.system.receptionist !
            Receptionist.Register(Mouth.MouthKey, context.self)

        Behaviors.receiveMessage { message =>
            message match {
                case SpeakText(msg) =>
                    println(s"Mouth: got a msg: $msg")
                    Behaviors.same
            }
        }
    }

}

Brain (an actor that needs to find another actor)

Next, the Brain actor needs to discover the Mouth actor. Asynchronously finding a discoverable actor and sending a message to it is a five-step process, and I demonstrate the steps in this code:

object Brain {

    sealed trait MessageToBrain
    final case object FindTheMouth extends MessageToBrain
    private case class ListingResponse(listing: Receptionist.Listing)
        extends MessageToBrain

    // this line of code is long, so i wrapped it onto two lines
    def apply(): Behavior[MessageToBrain] = Behaviors.setup {
        context: ActorContext[MessageToBrain] =>

        // (1) we can’t initially get a reference to the Mouth actor, so
        // declare this variable as a var field, and using Option/None
        var mouth: Option[ActorRef[Mouth.MessageToMouth]] = None

        // (2) create an ActorRef that can be thought of as a Receptionist
        // Listing “adapter.” this will be used in the next line of code.
        // the Brain.ListingResponse(listing) part of the code tells the
        // Receptionist how to get back in touch with us after we contact
        // it in Step 4 below.
        // also, this line of code is long, so i wrapped it onto two lines.
        val listingAdapter: ActorRef[Receptionist.Listing] =
            context.messageAdapter { listing =>
                println(s"listingAdapter:listing: ${listing.toString}")
                Brain.ListingResponse(listing)
        }

        // (3) send a message to the Receptionist saying that we want
        // to subscribe to events related to Mouth.MouthKey, which
        // represents the Mouth actor.
        context.system.receptionist !
            Receptionist.Subscribe(Mouth.MouthKey, listingAdapter)

        Behaviors.receiveMessage { message =>
            message match {
                case FindTheMouth =>
                    // (4) send a Find message to the Receptionist, saying
                    // that we want to find any/all listings related to
                    // Mouth.MouthKey, i.e., the Mouth actor.
                    println(s"Brain: got a FindTheMouth message")
                    context.system.receptionist !
                        Receptionist.Find(Mouth.MouthKey, listingAdapter)
                    Behaviors.same
                case ListingResponse(Mouth.MouthKey.Listing(listings)) =>
                    // (5) after Step 4, the Receptionist sends us this
                    // ListingResponse message. the `listings` variable is
                    // a Set of ActorRef of type Mouth.MessageToMouth, which
                    // you can interpret as “a set of Mouth ActorRefs.” for
                    // this example i know that there will be at most one
                    // Mouth actor, but in other cases there may be more
                    // than one actor in this set.
                    println(s"Brain: got a ListingResponse message")
                    // i add this line just to be clear about `listings` type
                    val xs: Set[ActorRef[Mouth.MessageToMouth]] = listings
                    // loop through all of the ActorRefs
                    for (x <- xs) {
                        // there should be only one ActorRef, so i assign it
                        // to the `mouth` variable i created earlier
                        mouth = Some(x)
                        // send a SpeakText message to the Mouth actor
                        mouth.foreach{ m =>
                            m ! Mouth.SpeakText("Brain says hello to Mouth")
                        }
                    }
                    Behaviors.same
            }
        }
    }
}

A Supervisor

After that initial setup, things get easier. All we need to do now is (a) create a Guardian/Supervisor class that will create the Mouth and Brain actors, and then (b) send a message to the Brain actor to get things started. In this example I use the name Supervisor for this class because I’ve used the name Guardian in previous examples, and both names are commonly used.

I created the Supervisor class using the FP-style, and added comments to it explain the key parts:

object Supervisor {

    // the messages this actor can handle
    sealed trait SupervisorMessage
    final case object Start extends SupervisorMessage

    // the usual factory method.
    // this line of code is very long, so i wrapped it onto three lines here.
    def apply(): Behavior[SupervisorMessage] =
        Behaviors.setup[SupervisorMessage] {
        actorContext: ActorContext[SupervisorMessage] =>

        // create our two child actors. in this case i could have passed the
        // Mouth ActorRef to the Brain constructor, but the purpose of this
        // recipe is to learn how to discover actors, so i didn’t do that.
        // in some situations you won’t be able to pass an actor an ActorRef
        // during the construction process.
        val mouth: ActorRef[Mouth.MessageToMouth] = actorContext.spawn(
            Mouth(), "Mouth"
        )
        val brain: ActorRef[Brain.MessageToBrain] = actorContext.spawn(
            Brain(), "Brain"
        )

        // when we receive a Start message, send the FindTheMouth message
        // to the Brain
        Behaviors.receiveMessage { consoleMessage =>
            consoleMessage match {
                case Start =>
                    println(s"Supervisor got a Start message")
                    brain ! Brain.FindTheMouth
                    Behaviors.same
            }
        }
    }
}

The Akka Typed App

Finally, here’s the App that starts the action:

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior,ActorContext,Behaviors}
import akka.actor.typed.receptionist.{Receptionist,ServiceKey}
import java.util.Scanner

object ReceptionistDemo1 extends App {

    // create the ActorSystem
    val supervisor: ActorSystem[Supervisor.SupervisorMessage] = ActorSystem(
        Supervisor(),
        "Supervisor"
    )

    // send the Start message to the Supervisor
    supervisor ! Supervisor.Start

    // wait a few moments, and then stop the Supervisor
    Thread.sleep(200)
    supervisor.terminate()

}

After removing some Akka logging statements, this is the output from the ReceptionistDemo1 app. I reformatted it slightly to make it easier to read:

Supervisor got a Start message
Brain: got a FindTheMouth message

listingAdapter:listing:Listing(
    ServiceKey[receptionist_simple.Mouth$MessageToMouth](Mouth),
    Set(Actor[akka://Supervisor/user/Mouth#-676062261]),
    Set(Actor[akka://Supervisor/user/Mouth#-676062261]),true
)

Brain: got a ListingResponse message
Mouth: got a msg: Brain says hello to Mouth

listingAdapter:listing:Listing(
    ServiceKey[receptionist_simple.Mouth$MessageToMouth](Mouth),
    Set(Actor[akka://Supervisor/user/Mouth#-676062261]),
    Set(Actor[akka://Supervisor/user/Mouth#-676062261]),true
)

Brain: got a ListingResponse message
Mouth: got a msg: Brain says hello to Mouth

These messages show that the Brain actor was able to successfully find the Mouth actor and send a message to it.

Discussion

The Solution shows an asynchronous solution to the problem of finding actors using the Receptionist. This next example shows a synchronous solution to this problem.

Mouth, Supervisor, and App

The Mouth, Supervisor, and App all stay the same, so I won’t repeat that code here.

Brain

The Brain uses a synchronous approach with the ask method, as shown in the following code. As usual, I’ve added comments to the code to explain the relevant steps:

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior,ActorContext,Behaviors}
import akka.actor.typed.receptionist.{Receptionist,ServiceKey}
import akka.actor.typed.receptionist.Receptionist.{Find, Listing}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.{Failure, Random, Success}

object Brain {

    // our messages
    sealed trait MessageToBrain
    final case object FindTheMouth extends MessageToBrain
    final case class BrainFailure(s: String) extends MessageToBrain
    final case class FoundTheMouth(listing: Listing) extends MessageToBrain

    // the usual factory method
    def apply(): Behavior[MessageToBrain] =
        Behaviors.setup { context: ActorContext[MessageToBrain] =>

        // (1) we need to find the mouth, so this needs to be a var
        // field, and it uses Option and None
        var mouth: Option[ActorRef[Mouth.MessageToMouth]] = None

        Behaviors.receiveMessage { message =>
            message match {
                case FindTheMouth =>
                    // (2) when we receive this message, we use `ask` and a
                    // timeout value to try to find the Mouth actor using
                    // its MouthKey. in response, the Receptionist will
                    // either give us a Success or Failure value.
                    // when we get a Success value, we send ourself the
                    // `FoundTheMouth(listing)` message. this isn’t
                    // completely necessary, but i did it to simplify the
                    // code in this area.
                    println("Brain: Got a FindTheMouth message")
                    implicit val timeout: Timeout = 1.second
                    context.ask(
                        context.system.receptionist,
                        Find(Mouth.MouthKey)
                    ) {
                        case Success(listing: Listing) =>
                            Brain.FoundTheMouth(listing)
                        case Failure(_) =>
                            Brain.BrainFailure("Error: Could not find Mouth")
                    }
                    Behaviors.same

                case FoundTheMouth(listing) =>
                    // (3) this is the message we sent to ourself in Step 2.
                    // as with the example in the Solution, the Receptionist
                    // sent us a Set of Mouth actor references.
                    println("Brain: Got a FoundTheMouth message")
                    val instances: Set[ActorRef[Mouth.MessageToMouth]] =
                        listing.serviceInstances(Mouth.MouthKey)
                    // (4) for this example i know there should only be one
                    // Mouth ActorRef in the Set, so i assign it to the
                    // `mouth` variable i created earlier. if this succeeds,
                    // it sends the SpeakText message to the Mouth.
                    mouth = instances.headOption
                    mouth.foreach { m =>
                        m ! Mouth.SpeakText("Brain says hello to Mouth.")
                    }
                    Behaviors.same
                case BrainFailure(msg) =>
                    // (5) if the process in Step 2 fails, we send ourself
                    // the BrainFailure message, which is printed here.
                    println(msg)
                    Behaviors.same
            }
        }
    }
}

The big difference here is that I use context.ask and the Receptionist.Find message to query the Receptionist synchronously, compared to contacting it asynchronously in the previous example. With this technique it’s very important to use a Timeout value when using ask, so you can have control over how long your code will block in the event the Mouth actor can’t be found.

After that, the body of the code block that contains the Success and Failure cases can be implemented in a variety of ways, and I use the approach shown to keep that code block small.

The output from the App now looks like this:

AlekaSupervisor: Got a Start message
Brain: Got a FindTheMouth message
Brain: Got a FoundTheMouth message
Mouth: got a msg: Brain says hello to Mouth.

Once again the Brain is able to find the Mouth using the Receptionist.