|
Akka/Scala example source code file (DistributedPubSubMediatorSpec.scala)
The DistributedPubSubMediatorSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.contrib.pattern import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorRef import akka.actor.PoisonPill import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.actor.ActorLogging import akka.contrib.pattern.DistributedPubSubMediator.Internal.Status import akka.contrib.pattern.DistributedPubSubMediator.Internal.Delta object DistributedPubSubMediatorSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") commonConfig(ConfigFactory.parseString(""" akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s akka.contrib.cluster.pub-sub.max-delta-elements = 500 """)) object TestChatUser { final case class Whisper(path: String, msg: Any) final case class Talk(path: String, msg: Any) final case class TalkToOthers(path: String, msg: Any) final case class Shout(topic: String, msg: Any) final case class ShoutToGroups(topic: String, msg: Any) final case class JoinGroup(topic: String, group: String) final case class ExitGroup(topic: String, group: String) } class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor { import TestChatUser._ import DistributedPubSubMediator._ def receive = { case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, allButSelf = true) case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) case ShoutToGroups(topic, msg) ⇒ mediator ! Publish(topic, msg, true) case JoinGroup(topic, group) ⇒ mediator ! Subscribe(topic, Some(group), self) case ExitGroup(topic, group) ⇒ mediator ! Unsubscribe(topic, Some(group), self) case msg ⇒ testActor ! msg } } //#publisher class Publisher extends Actor { import DistributedPubSubMediator.Publish // activate the extension val mediator = DistributedPubSubExtension(context.system).mediator def receive = { case in: String ⇒ val out = in.toUpperCase mediator ! Publish("content", out) } } //#publisher //#subscriber class Subscriber extends Actor with ActorLogging { import DistributedPubSubMediator.{ Subscribe, SubscribeAck } val mediator = DistributedPubSubExtension(context.system).mediator // subscribe to the topic named "content" mediator ! Subscribe("content", self) def receive = { case SubscribeAck(Subscribe("content", None, `self`)) ⇒ context become ready } def ready: Actor.Receive = { case s: String ⇒ log.info("Got {}", s) } } //#subscriber } class DistributedPubSubMediatorMultiJvmNode1 extends DistributedPubSubMediatorSpec class DistributedPubSubMediatorMultiJvmNode2 extends DistributedPubSubMediatorSpec class DistributedPubSubMediatorMultiJvmNode3 extends DistributedPubSubMediatorSpec class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMediatorSpec) with STMultiNodeSpec with ImplicitSender { import DistributedPubSubMediatorSpec._ import DistributedPubSubMediatorSpec.TestChatUser._ import DistributedPubSubMediator._ override def initialParticipants = roles.size def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system) join node(to).address createMediator() } enterBarrier(from.name + "-joined") } def createMediator(): ActorRef = DistributedPubSubExtension(system).mediator def mediator: ActorRef = DistributedPubSubExtension(system).mediator var chatUsers: Map[String, ActorRef] = Map.empty def createChatUser(name: String): ActorRef = { var a = system.actorOf(Props(classOf[TestChatUser], mediator, testActor), name) chatUsers += (name -> a) a } def chatUser(name: String): ActorRef = chatUsers(name) def awaitCount(expected: Int): Unit = { awaitAssert { mediator ! Count expectMsgType[Int] should be(expected) } } "A DistributedPubSubMediator" must { "startup 2 node cluster" in within(15 seconds) { join(first, first) join(second, first) enterBarrier("after-1") } "keep track of added users" in within(15 seconds) { runOn(first) { val u1 = createChatUser("u1") mediator ! Put(u1) val u2 = createChatUser("u2") mediator ! Put(u2) awaitCount(2) // send to actor at same node u1 ! Whisper("/user/u2", "hello") expectMsg("hello") lastSender should be(u2) } runOn(second) { val u3 = createChatUser("u3") mediator ! Put(u3) } runOn(first, second) { awaitCount(3) } enterBarrier("3-registered") runOn(second) { val u4 = createChatUser("u4") mediator ! Put(u4) } runOn(first, second) { awaitCount(4) } enterBarrier("4-registered") runOn(first) { // send to actor on another node chatUser("u1") ! Whisper("/user/u4", "hi there") } runOn(second) { expectMsg("hi there") lastSender.path.name should be("u4") } enterBarrier("after-2") } "replicate users to new node" in within(20 seconds) { join(third, first) runOn(third) { val u5 = createChatUser("u5") mediator ! Put(u5) } awaitCount(5) enterBarrier("5-registered") runOn(third) { chatUser("u5") ! Whisper("/user/u4", "go") } runOn(second) { expectMsg("go") lastSender.path.name should be("u4") } enterBarrier("after-3") } "keep track of removed users" in within(15 seconds) { runOn(first) { val u6 = createChatUser("u6") mediator ! Put(u6) } awaitCount(6) enterBarrier("6-registered") runOn(first) { mediator ! Remove("/user/u6") } awaitCount(5) enterBarrier("after-4") } "remove terminated users" in within(5 seconds) { runOn(second) { chatUser("u3") ! PoisonPill } awaitCount(4) enterBarrier("after-5") } "publish" in within(15 seconds) { runOn(first, second) { val u7 = createChatUser("u7") mediator ! Put(u7) } awaitCount(6) enterBarrier("7-registered") runOn(third) { chatUser("u5") ! Talk("/user/u7", "hi") } runOn(first, second) { expectMsg("hi") lastSender.path.name should be("u7") } runOn(third) { expectNoMsg(2.seconds) } enterBarrier("after-6") } "publish to topic" in within(15 seconds) { runOn(first) { val s8 = Subscribe("topic1", createChatUser("u8")) mediator ! s8 expectMsg(SubscribeAck(s8)) val s9 = Subscribe("topic1", createChatUser("u9")) mediator ! s9 expectMsg(SubscribeAck(s9)) } runOn(second) { val s10 = Subscribe("topic1", createChatUser("u10")) mediator ! s10 expectMsg(SubscribeAck(s10)) } // one topic on two nodes awaitCount(8) enterBarrier("topic1-registered") runOn(third) { chatUser("u5") ! Shout("topic1", "hello all") } runOn(first) { val names = receiveWhile(messages = 2) { case "hello all" ⇒ lastSender.path.name } names.toSet should be(Set("u8", "u9")) } runOn(second) { expectMsg("hello all") lastSender.path.name should be("u10") } runOn(third) { expectNoMsg(2.seconds) } enterBarrier("after-7") } "demonstrate usage" in within(15 seconds) { def later(): Unit = { awaitCount(10) } //#start-subscribers runOn(first) { system.actorOf(Props[Subscriber], "subscriber1") } runOn(second) { system.actorOf(Props[Subscriber], "subscriber2") system.actorOf(Props[Subscriber], "subscriber3") } //#start-subscribers //#publish-message runOn(third) { val publisher = system.actorOf(Props[Publisher], "publisher") later() // after a while the subscriptions are replicated publisher ! "hello" } //#publish-message enterBarrier("after-8") } "send-all to all other nodes" in within(15 seconds) { runOn(first, second, third) { // create the user on all nodes val u11 = createChatUser("u11") mediator ! Put(u11) } awaitCount(13) enterBarrier("11-registered") runOn(third) { chatUser("u5") ! TalkToOthers("/user/u11", "hi") // sendToAll to all other nodes } runOn(first, second) { expectMsg("hi") lastSender.path.name should be("u11") } runOn(third) { expectNoMsg(2.seconds) // sender() node should not receive a message } enterBarrier("after-11") } "send one message to each group" in within(20 seconds) { runOn(first) { val u12 = createChatUser("u12") u12 ! JoinGroup("topic2", "group1") expectMsg(SubscribeAck(Subscribe("topic2", Some("group1"), u12))) } runOn(second) { val u12 = createChatUser("u12") u12 ! JoinGroup("topic2", "group2") expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u12))) val u13 = createChatUser("u13") u13 ! JoinGroup("topic2", "group2") expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u13))) } awaitCount(17) enterBarrier("12-registered") runOn(first) { chatUser("u12") ! ShoutToGroups("topic2", "hi") } runOn(first, second) { expectMsg("hi") expectNoMsg(2.seconds) // each group receive only one message } enterBarrier("12-published") runOn(first) { val u12 = chatUser("u12") u12 ! ExitGroup("topic2", "group1") expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group1"), u12))) } runOn(second) { val u12 = chatUser("u12") u12 ! ExitGroup("topic2", "group2") expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u12))) val u13 = chatUser("u13") u13 ! ExitGroup("topic2", "group2") expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u13))) } enterBarrier("after-12") } "transfer delta correctly" in { val firstAddress = node(first).address val secondAddress = node(second).address val thirdAddress = node(third).address runOn(first) { mediator ! Status(versions = Map.empty) val deltaBuckets = expectMsgType[Delta].buckets deltaBuckets.size should be(3) deltaBuckets.find(_.owner == firstAddress).get.content.size should be(9) deltaBuckets.find(_.owner == secondAddress).get.content.size should be(8) deltaBuckets.find(_.owner == thirdAddress).get.content.size should be(2) } enterBarrier("verified-initial-delta") // this test is configured with max-delta-elements = 500 val many = 1010 runOn(first) { for (i ← 0 until many) mediator ! Put(createChatUser("u" + (1000 + i))) mediator ! Status(versions = Map.empty) val deltaBuckets1 = expectMsgType[Delta].buckets deltaBuckets1.map(_.content.size).sum should be(500) mediator ! Status(versions = deltaBuckets1.map(b ⇒ b.owner -> b.version).toMap) val deltaBuckets2 = expectMsgType[Delta].buckets deltaBuckets1.map(_.content.size).sum should be(500) mediator ! Status(versions = deltaBuckets2.map(b ⇒ b.owner -> b.version).toMap) val deltaBuckets3 = expectMsgType[Delta].buckets deltaBuckets3.map(_.content.size).sum should be(9 + 8 + 2 + many - 500 - 500) } enterBarrier("verified-delta-with-many") within(10.seconds) { awaitCount(17 + many) } enterBarrier("after-13") } "remove entries when node is removed" in within(30 seconds) { mediator ! Count val countBefore = expectMsgType[Int] runOn(first) { testConductor.exit(third, 0).await } enterBarrier("third-shutdown") // third had 2 entries u5 and u11, and those should be removed everywhere runOn(first, second) { awaitCount(countBefore - 2) } enterBarrier("after-14") } "receive proper unsubscribeAck message" in within(15 seconds) { runOn(first) { val user = createChatUser("u111") val topic = "sample-topic1" val s1 = Subscribe(topic, user) mediator ! s1 expectMsg(SubscribeAck(s1)) val uns = Unsubscribe(topic, user) mediator ! uns expectMsg(UnsubscribeAck(uns)) } enterBarrier("after-14") } } } Other Akka source code examplesHere is a short list of links related to this Akka DistributedPubSubMediatorSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.