|
Akka/Scala example source code file (ClusterDomainEventPublisherSpec.scala)
The ClusterDomainEventPublisherSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.collection.immutable.SortedSet
import scala.concurrent.duration._
import org.scalatest.BeforeAndAfterEach
import akka.actor.Address
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.MemberStatus._
import akka.cluster.InternalClusterAction._
import akka.cluster.ClusterEvent._
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.actor.ActorRef
import akka.testkit.TestProbe
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterDomainEventPublisherSpec extends AkkaSpec
with BeforeAndAfterEach with ImplicitSender {
var publisher: ActorRef = _
val aUp = TestMember(Address("akka.tcp", "sys", "a", 2552), Up)
val aLeaving = aUp.copy(status = Leaving)
val aExiting = aLeaving.copy(status = Exiting)
val aRemoved = aExiting.copy(status = Removed)
val bExiting = TestMember(Address("akka.tcp", "sys", "b", 2552), Exiting)
val bRemoved = bExiting.copy(status = Removed)
val cJoining = TestMember(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP"))
val cUp = cJoining.copy(status = Up)
val cRemoved = cUp.copy(status = Removed)
val a51Up = TestMember(Address("akka.tcp", "sys", "a", 2551), Up)
val dUp = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP"))
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress)
val g1 = Gossip(members = SortedSet(aUp, bExiting, cJoining)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cJoining.uniqueAddress)
val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.uniqueAddress)
val g3 = g2.seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress)
val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress)
val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress)
val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress)
val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress)
val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability =
Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress)
// created in beforeEach
var memberSubscriber: TestProbe = _
override def beforeEach(): Unit = {
memberSubscriber = TestProbe()
system.eventStream.subscribe(memberSubscriber.ref, classOf[MemberEvent])
system.eventStream.subscribe(memberSubscriber.ref, classOf[LeaderChanged])
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
publisher ! PublishChanges(g0)
memberSubscriber.expectMsg(MemberUp(aUp))
memberSubscriber.expectMsg(LeaderChanged(Some(aUp.address)))
}
"ClusterDomainEventPublisher" must {
"publish MemberUp" in {
publisher ! PublishChanges(g2)
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp))
}
"publish leader changed" in {
publisher ! PublishChanges(g4)
memberSubscriber.expectMsg(MemberUp(a51Up))
memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
memberSubscriber.expectNoMsg(500 millis)
}
"publish leader changed when old leader leaves and is removed" in {
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g6)
memberSubscriber.expectNoMsg(500 millis)
publisher ! PublishChanges(g7)
memberSubscriber.expectMsg(MemberExited(aExiting))
memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address)))
memberSubscriber.expectNoMsg(500 millis)
// at the removed member a an empty gossip is the last thing
publisher ! PublishChanges(Gossip.empty)
memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting))
memberSubscriber.expectMsg(MemberRemoved(bRemoved, Exiting))
memberSubscriber.expectMsg(MemberRemoved(cRemoved, Up))
memberSubscriber.expectMsg(LeaderChanged(None))
}
"not publish leader changed when same leader" in {
publisher ! PublishChanges(g4)
memberSubscriber.expectMsg(MemberUp(a51Up))
memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
publisher ! PublishChanges(g5)
memberSubscriber.expectNoMsg(500 millis)
}
"publish role leader changed" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged]))
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address)))
publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))
}
"send CurrentClusterState when subscribe" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[ClusterDomainEvent]))
subscriber.expectMsgType[CurrentClusterState]
// but only to the new subscriber
memberSubscriber.expectNoMsg(500 millis)
}
"send events corresponding to current state when subscribe" in {
val subscriber = TestProbe()
publisher ! PublishChanges(g8)
publisher ! Subscribe(subscriber.ref, InitialStateAsEvents, Set(classOf[MemberEvent], classOf[ReachabilityEvent]))
subscriber.receiveN(4).toSet should be(Set(MemberUp(aUp), MemberUp(cUp), MemberUp(dUp), MemberExited(bExiting)))
subscriber.expectMsg(UnreachableMember(dUp))
subscriber.expectNoMsg(500 millis)
}
"support unsubscribe" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent]))
subscriber.expectMsgType[CurrentClusterState]
publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent]))
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(500 millis)
// but memberSubscriber is still subscriber
memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp))
}
"publish SeenChanged" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[SeenChanged]))
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g2)
subscriber.expectMsgType[SeenChanged]
subscriber.expectNoMsg(500 millis)
publisher ! PublishChanges(g3)
subscriber.expectMsgType[SeenChanged]
subscriber.expectNoMsg(500 millis)
}
"publish Removed when stopped" in {
publisher ! PoisonPill
memberSubscriber.expectMsg(MemberRemoved(aRemoved, Up))
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ClusterDomainEventPublisherSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.