Akka Problem: In some situations you can’t pass an ActorRef
to another actor, so you want to see how to “look up” an Akka Typed actor so you can send a message to it.
Solution
There are at least two ways to find other Akka Typed actors, and both involve using the Akka Receptionist
. The example shown here in the solution shows how to find an actor asynchronously, and the example shown in the Discussion shows how to use the ask
method to find another actor synchronously.
In both examples I’ll use the idea of creating an actor system that works like Amazon Echo devices. The basic idea is that a device like this has ears to listen to you, a mouth to speak to you, and a brain to do all of its work. In these examples the Brain
actor will need to discover the Mouth
actor.
Import statements
The following import statements are required by the Mouth
and Brain
actors that follow. The Receptionist
and ServiceKey
have not been used until this recipe:
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior,ActorContext,Behaviors}
import akka.actor.typed.receptionist.{Receptionist,ServiceKey}
Mouth (an actor that wants to be found)
The first step in the process is to create a Mouth
actor that can be discovered. For an actor to be discoverable, it must register itself with the Akka Receptionist
. The Mouth
actor shows the two steps necessary to register an actor with the Receptionist
:
-
Create a unique
ServiceKey
-
Send a
Receptionist.Register
message to theReceptionist
, using theServiceKey
As usual, I comments to the code to explain the key parts:
object Mouth {
sealed trait MessageToMouth
case class SpeakText(msg: String) extends MessageToMouth
private case class ListingResponse(listing: Receptionist.Listing)
extends MessageToMouth
// (1) a ServiceKey is a unique identifier for this actor
val MouthKey: ServiceKey[MessageToMouth] = ServiceKey("Mouth")
// this line of code is long, so i wrapped it onto two lines
def apply(): Behavior[MessageToMouth] = Behaviors.setup {
context: ActorContext[MessageToMouth] =>
// (2) every actor that wants to be discoverable must register itself
// with the Receptionist by sending the Receptionist as Receptionist
// message, including your ServiceKey
context.system.receptionist !
Receptionist.Register(Mouth.MouthKey, context.self)
Behaviors.receiveMessage { message =>
message match {
case SpeakText(msg) =>
println(s"Mouth: got a msg: $msg")
Behaviors.same
}
}
}
}
Brain (an actor that needs to find another actor)
Next, the Brain
actor needs to discover the Mouth
actor. Asynchronously finding a discoverable actor and sending a message to it is a five-step process, and I demonstrate the steps in this code:
object Brain {
sealed trait MessageToBrain
final case object FindTheMouth extends MessageToBrain
private case class ListingResponse(listing: Receptionist.Listing)
extends MessageToBrain
// this line of code is long, so i wrapped it onto two lines
def apply(): Behavior[MessageToBrain] = Behaviors.setup {
context: ActorContext[MessageToBrain] =>
// (1) we can’t initially get a reference to the Mouth actor, so
// declare this variable as a var field, and using Option/None
var mouth: Option[ActorRef[Mouth.MessageToMouth]] = None
// (2) create an ActorRef that can be thought of as a Receptionist
// Listing “adapter.” this will be used in the next line of code.
// the Brain.ListingResponse(listing) part of the code tells the
// Receptionist how to get back in touch with us after we contact
// it in Step 4 below.
// also, this line of code is long, so i wrapped it onto two lines.
val listingAdapter: ActorRef[Receptionist.Listing] =
context.messageAdapter { listing =>
println(s"listingAdapter:listing: ${listing.toString}")
Brain.ListingResponse(listing)
}
// (3) send a message to the Receptionist saying that we want
// to subscribe to events related to Mouth.MouthKey, which
// represents the Mouth actor.
context.system.receptionist !
Receptionist.Subscribe(Mouth.MouthKey, listingAdapter)
Behaviors.receiveMessage { message =>
message match {
case FindTheMouth =>
// (4) send a Find message to the Receptionist, saying
// that we want to find any/all listings related to
// Mouth.MouthKey, i.e., the Mouth actor.
println(s"Brain: got a FindTheMouth message")
context.system.receptionist !
Receptionist.Find(Mouth.MouthKey, listingAdapter)
Behaviors.same
case ListingResponse(Mouth.MouthKey.Listing(listings)) =>
// (5) after Step 4, the Receptionist sends us this
// ListingResponse message. the `listings` variable is
// a Set of ActorRef of type Mouth.MessageToMouth, which
// you can interpret as “a set of Mouth ActorRefs.” for
// this example i know that there will be at most one
// Mouth actor, but in other cases there may be more
// than one actor in this set.
println(s"Brain: got a ListingResponse message")
// i add this line just to be clear about `listings` type
val xs: Set[ActorRef[Mouth.MessageToMouth]] = listings
// loop through all of the ActorRefs
for (x <- xs) {
// there should be only one ActorRef, so i assign it
// to the `mouth` variable i created earlier
mouth = Some(x)
// send a SpeakText message to the Mouth actor
mouth.foreach{ m =>
m ! Mouth.SpeakText("Brain says hello to Mouth")
}
}
Behaviors.same
}
}
}
}
An Akka Supervisor
After that initial setup, things get easier. All we need to do now is (a) create a Guardian/Supervisor class that will create the Mouth
and Brain
actors, and then (b) send a message to the Brain
actor to get things started. In this example I use the name Supervisor
for this class because I’ve used the name Guardian
in previous examples, and both names are commonly used.
I created the Supervisor
class using the FP-style, and added comments to it explain the key parts:
object Supervisor {
// the messages this actor can handle
sealed trait SupervisorMessage
final case object Start extends SupervisorMessage
// the usual factory method.
// this line of code is very long, so i wrapped it onto three lines here.
def apply(): Behavior[SupervisorMessage] =
Behaviors.setup[SupervisorMessage] {
actorContext: ActorContext[SupervisorMessage] =>
// create our two child actors. in this case i could have passed the
// Mouth ActorRef to the Brain constructor, but the purpose of this
// recipe is to learn how to discover actors, so i didn’t do that.
// in some situations you won’t be able to pass an actor an ActorRef
// during the construction process.
val mouth: ActorRef[Mouth.MessageToMouth] = actorContext.spawn(
Mouth(), "Mouth"
)
val brain: ActorRef[Brain.MessageToBrain] = actorContext.spawn(
Brain(), "Brain"
)
// when we receive a Start message, send the FindTheMouth message
// to the Brain
Behaviors.receiveMessage { consoleMessage =>
consoleMessage match {
case Start =>
println(s"Supervisor got a Start message")
brain ! Brain.FindTheMouth
Behaviors.same
}
}
}
}
The Akka App
Finally, here’s the App
that starts the action:
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior,ActorContext,Behaviors}
import akka.actor.typed.receptionist.{Receptionist,ServiceKey}
import java.util.Scanner
object ReceptionistDemo1 extends App {
// create the ActorSystem
val supervisor: ActorSystem[Supervisor.SupervisorMessage] = ActorSystem(
Supervisor(),
"Supervisor"
)
// send the Start message to the Supervisor
supervisor ! Supervisor.Start
// wait a few moments, and then stop the Supervisor
Thread.sleep(200)
supervisor.terminate()
}
After removing some Akka logging statements, this is the output from the ReceptionistDemo1
app. I reformatted it slightly to make it easier to read:
Supervisor got a Start message Brain: got a FindTheMouth message listingAdapter:listing:Listing( ServiceKey[receptionist_simple.Mouth$MessageToMouth](Mouth), Set(Actor[akka://Supervisor/user/Mouth#-676062261]), Set(Actor[akka://Supervisor/user/Mouth#-676062261]),true ) Brain: got a ListingResponse message Mouth: got a msg: Brain says hello to Mouth listingAdapter:listing:Listing( ServiceKey[receptionist_simple.Mouth$MessageToMouth](Mouth), Set(Actor[akka://Supervisor/user/Mouth#-676062261]), Set(Actor[akka://Supervisor/user/Mouth#-676062261]),true ) Brain: got a ListingResponse message Mouth: got a msg: Brain says hello to Mouth
These messages show that the Brain
actor was able to successfully find the Mouth
actor and send a message to it.
Discussion
The Solution shows an asynchronous solution to the problem of finding actors using the Receptionist
. This next example shows a synchronous solution to this problem.
Mouth, Supervisor, and App
The Mouth
, Supervisor
, and App
all stay the same, so I won’t repeat that code here.
Brain
The Brain
uses a synchronous approach with the ask
method, as shown in the following code. As usual, I’ve added comments to the code to explain the relevant steps:
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior,ActorContext,Behaviors}
import akka.actor.typed.receptionist.{Receptionist,ServiceKey}
import akka.actor.typed.receptionist.Receptionist.{Find, Listing}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.{Failure, Random, Success}
object Brain {
// our messages
sealed trait MessageToBrain
final case object FindTheMouth extends MessageToBrain
final case class BrainFailure(s: String) extends MessageToBrain
final case class FoundTheMouth(listing: Listing) extends MessageToBrain
// the usual factory method
def apply(): Behavior[MessageToBrain] =
Behaviors.setup { context: ActorContext[MessageToBrain] =>
// (1) we need to find the mouth, so this needs to be a var
// field, and it uses Option and None
var mouth: Option[ActorRef[Mouth.MessageToMouth]] = None
Behaviors.receiveMessage { message =>
message match {
case FindTheMouth =>
// (2) when we receive this message, we use `ask` and a
// timeout value to try to find the Mouth actor using
// its MouthKey. in response, the Receptionist will
// either give us a Success or Failure value.
// when we get a Success value, we send ourself the
// `FoundTheMouth(listing)` message. this isn’t
// completely necessary, but i did it to simplify the
// code in this area.
println("Brain: Got a FindTheMouth message")
implicit val timeout: Timeout = 1.second
context.ask(
context.system.receptionist,
Find(Mouth.MouthKey)
) {
case Success(listing: Listing) =>
Brain.FoundTheMouth(listing)
case Failure(_) =>
Brain.BrainFailure("Error: Could not find Mouth")
}
Behaviors.same
case FoundTheMouth(listing) =>
// (3) this is the message we sent to ourself in Step 2.
// as with the example in the Solution, the Receptionist
// sent us a Set of Mouth actor references.
println("Brain: Got a FoundTheMouth message")
val instances: Set[ActorRef[Mouth.MessageToMouth]] =
listing.serviceInstances(Mouth.MouthKey)
// (4) for this example i know there should only be one
// Mouth ActorRef in the Set, so i assign it to the
// `mouth` variable i created earlier. if this succeeds,
// it sends the SpeakText message to the Mouth.
mouth = instances.headOption
mouth.foreach { m =>
m ! Mouth.SpeakText("Brain says hello to Mouth.")
}
Behaviors.same
case BrainFailure(msg) =>
// (5) if the process in Step 2 fails, we send ourself
// the BrainFailure message, which is printed here.
println(msg)
Behaviors.same
}
}
}
}
The big difference here is that I use context.ask
and the Receptionist.Find
message to query the Receptionist
synchronously, compared to contacting it asynchronously in the previous example. With this technique it’s very important to use a Timeout
value when using ask
, so you can have control over how long your code will block in the event the Mouth
actor can’t be found.
After that, the body of the code block that contains the Success
and Failure
cases can be implemented in a variety of ways, and I use the approach shown to keep that code block small.
The output from the App
now looks like this:
AlekaSupervisor: Got a Start message Brain: Got a FindTheMouth message Brain: Got a FoundTheMouth message Mouth: got a msg: Brain says hello to Mouth.
Once again the Brain
is able to find the Mouth
using the Receptionist
.
this post is sponsored by my books: | |||
#1 New Release |
FP Best Seller |
Learn Scala 3 |
Learn FP Fast |