|
Akka/Scala example source code file (BarrierSpec.scala)
The BarrierSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.testconductor
import language.postfixOps
import akka.actor._
import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest }
import scala.concurrent.duration._
import akka.event.Logging
import akka.util.Timeout
import org.scalatest.BeforeAndAfterEach
import java.net.{ InetSocketAddress, InetAddress }
object BarrierSpec {
final case class Failed(ref: ActorRef, thr: Throwable)
val config = """
akka.testconductor.barrier-timeout = 5s
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.actor.debug.fsm = on
akka.actor.debug.lifecycle = on
"""
}
class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
import BarrierSpec._
import Controller._
import BarrierCoordinator._
val A = RoleName("a")
val B = RoleName("b")
val C = RoleName("c")
"A BarrierCoordinator" must {
"register clients and remove them" taggedAs TimingTest in {
val b = getBarrier()
b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
b ! RemoveClient(B)
b ! RemoveClient(A)
EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! RemoveClient(A)
}
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot remove RoleName(a): no client to remove")))
}
"register clients and disconnect them" taggedAs TimingTest in {
val b = getBarrier()
b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
b ! ClientDisconnected(B)
expectNoMsg(1 second)
b ! ClientDisconnected(A)
expectNoMsg(1 second)
}
"fail entering barrier when nobody registered" taggedAs TimingTest in {
val b = getBarrier()
b ! EnterBarrier("bar1", None)
expectMsg(ToClient(BarrierResult("bar1", false)))
}
"enter barrier" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar2", None))
noMsg(a, b)
within(2 seconds) {
b.send(barrier, EnterBarrier("bar2", None))
a.expectMsg(ToClient(BarrierResult("bar2", true)))
b.expectMsg(ToClient(BarrierResult("bar2", true)))
}
}
"enter barrier with joining node" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b, c = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar3", None))
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
b.send(barrier, EnterBarrier("bar3", None))
noMsg(a, b, c)
within(2 seconds) {
c.send(barrier, EnterBarrier("bar3", None))
a.expectMsg(ToClient(BarrierResult("bar3", true)))
b.expectMsg(ToClient(BarrierResult("bar3", true)))
c.expectMsg(ToClient(BarrierResult("bar3", true)))
}
}
"enter barrier with leaving node" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b, c = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
a.send(barrier, EnterBarrier("bar4", None))
b.send(barrier, EnterBarrier("bar4", None))
barrier ! RemoveClient(A)
barrier ! ClientDisconnected(A)
noMsg(a, b, c)
b.within(2 seconds) {
barrier ! RemoveClient(C)
b.expectMsg(ToClient(BarrierResult("bar4", true)))
}
barrier ! ClientDisconnected(C)
expectNoMsg(1 second)
}
"leave barrier when last “arrived” is removed" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar5", None))
barrier ! RemoveClient(A)
b.send(barrier, EnterBarrier("foo", None))
b.expectMsg(ToClient(BarrierResult("foo", true)))
}
"fail barrier with disconnecing node" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar6", None))
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B)
}
val msg = expectMsgType[Failed]
msg match {
case Failed(barrier, thr: ClientLost) if (thr == ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil, thr.data.deadline), B)) ⇒
case x ⇒ fail("Expected " + Failed(barrier, ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil, null), B)) + " but got " + x)
}
}
"fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b, c = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
barrier ! nodeA
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeC
a.send(barrier, EnterBarrier("bar7", None))
b.send(barrier, EnterBarrier("bar7", None))
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B)
}
val msg = expectMsgType[Failed]
msg match {
case Failed(barrier, thr: ClientLost) if (thr == ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil, thr.data.deadline), B)) ⇒
case x ⇒ fail("Expected " + Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil, null), B)) + " but got " + x)
}
}
"fail when entering wrong barrier" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeB
a.send(barrier, EnterBarrier("bar8", None))
EventFilter[WrongBarrier](occurrences = 1) intercept {
b.send(barrier, EnterBarrier("foo", None))
}
val msg = expectMsgType[Failed]
msg match {
case Failed(barrier, thr: WrongBarrier) if (thr == WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil, thr.data.deadline))) ⇒
case x ⇒ fail("Expected " + Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil, null))) + " but got " + x)
}
}
"fail barrier after first failure" taggedAs TimingTest in {
val barrier = getBarrier()
val a = TestProbe()
EventFilter[BarrierEmpty](occurrences = 1) intercept {
barrier ! RemoveClient(A)
}
val msg = expectMsgType[Failed]
msg match {
case Failed(barrier, thr: BarrierEmpty) if (thr == BarrierEmpty(Data(Set(), "", Nil, thr.data.deadline), "cannot remove RoleName(a): no client to remove")) ⇒
case x ⇒ fail("Expected " + Failed(barrier, BarrierEmpty(Data(Set(), "", Nil, null), "cannot remove RoleName(a): no client to remove")) + " but got " + x)
}
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
a.send(barrier, EnterBarrier("bar9", None))
a.expectMsg(ToClient(BarrierResult("bar9", false)))
}
"fail after barrier timeout" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
barrier ! nodeB
a.send(barrier, EnterBarrier("bar10", None))
EventFilter[BarrierTimeout](occurrences = 1) intercept {
val msg = expectMsgType[Failed](7 seconds)
msg match {
case Failed(barrier, thr: BarrierTimeout) if (thr == BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil, thr.data.deadline))) ⇒
case x ⇒ fail("Expected " + Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil, null))) + " but got " + x)
}
}
}
"fail if a node registers twice" taggedAs TimingTest in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(A, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
EventFilter[DuplicateNode](occurrences = 1) intercept {
barrier ! nodeB
}
val msg = expectMsgType[Failed]
msg match {
case Failed(barrier, thr: DuplicateNode) if (thr == DuplicateNode(Data(Set(nodeA), "", Nil, thr.data.deadline), nodeB)) ⇒
case x ⇒ fail("Expected " + Failed(barrier, DuplicateNode(Data(Set(nodeA), "", Nil, null), nodeB)) + " but got " + x)
}
}
"finally have no failure messages left" taggedAs TimingTest in {
expectNoMsg(1 second)
}
}
"A Controller with BarrierCoordinator" must {
"register clients and remove them" taggedAs TimingTest in {
withController(1) { b ⇒
b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
expectMsg(ToClient(Done))
b ! Remove(B)
b ! Remove(A)
EventFilter.warning(start = "cannot remove", occurrences = 1) intercept {
b ! Remove(A)
}
Thread.sleep(5000)
}
}
"register clients and disconnect them" taggedAs TimingTest in {
withController(1) { b ⇒
b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
expectMsg(ToClient(Done))
b ! ClientDisconnected(B)
expectNoMsg(1 second)
b ! ClientDisconnected(A)
expectNoMsg(1 second)
}
}
"fail entering barrier when nobody registered" taggedAs TimingTest in {
withController(0) { b ⇒
b ! EnterBarrier("b", None)
expectMsg(ToClient(BarrierResult("b", false)))
}
}
"enter barrier" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar11", None))
noMsg(a, b)
within(2 seconds) {
b.send(barrier, EnterBarrier("bar11", None))
a.expectMsg(ToClient(BarrierResult("bar11", true)))
b.expectMsg(ToClient(BarrierResult("bar11", true)))
}
}
}
"enter barrier with joining node" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b, c = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar12", None))
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
c.expectMsg(ToClient(Done))
b.send(barrier, EnterBarrier("bar12", None))
noMsg(a, b, c)
within(2 seconds) {
c.send(barrier, EnterBarrier("bar12", None))
a.expectMsg(ToClient(BarrierResult("bar12", true)))
b.expectMsg(ToClient(BarrierResult("bar12", true)))
c.expectMsg(ToClient(BarrierResult("bar12", true)))
}
}
}
"enter barrier with leaving node" taggedAs TimingTest in {
withController(3) { barrier ⇒
val a, b, c = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
c.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar13", None))
b.send(barrier, EnterBarrier("bar13", None))
barrier ! Remove(A)
barrier ! ClientDisconnected(A)
noMsg(a, b, c)
b.within(2 seconds) {
barrier ! Remove(C)
b.expectMsg(ToClient(BarrierResult("bar13", true)))
}
barrier ! ClientDisconnected(C)
expectNoMsg(1 second)
}
}
"leave barrier when last “arrived” is removed" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar14", None))
barrier ! Remove(A)
b.send(barrier, EnterBarrier("foo", None))
b.expectMsg(ToClient(BarrierResult("foo", true)))
}
}
"fail barrier with disconnecing node" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar15", None))
barrier ! ClientDisconnected(RoleName("unknown"))
noMsg(a)
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B)
}
a.expectMsg(ToClient(BarrierResult("bar15", false)))
}
}
"fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
withController(3) { barrier ⇒
val a, b, c = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
barrier ! nodeA
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeC
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
c.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar16", None))
b.send(barrier, EnterBarrier("bar16", None))
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B)
}
a.expectMsg(ToClient(BarrierResult("bar16", false)))
}
}
"fail when entering wrong barrier" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeB
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar17", None))
EventFilter[WrongBarrier](occurrences = 1) intercept {
b.send(barrier, EnterBarrier("foo", None))
}
a.expectMsg(ToClient(BarrierResult("bar17", false)))
b.expectMsg(ToClient(BarrierResult("foo", false)))
}
}
"fail after barrier timeout" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
barrier ! nodeB
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar18", Option(2 seconds)))
EventFilter[BarrierTimeout](occurrences = 1) intercept {
Thread.sleep(4000)
}
b.send(barrier, EnterBarrier("bar18", None))
a.expectMsg(ToClient(BarrierResult("bar18", false)))
b.expectMsg(ToClient(BarrierResult("bar18", false)))
}
}
"fail if a node registers twice" taggedAs TimingTest in {
withController(2) { controller ⇒
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(A, AddressFromURIString("akka://sys"), b.ref)
controller ! nodeA
EventFilter[DuplicateNode](occurrences = 1) intercept {
controller ! nodeB
}
a.expectMsg(ToClient(BarrierResult("initial startup", false)))
b.expectMsg(ToClient(BarrierResult("initial startup", false)))
}
}
"fail subsequent barriers if a node registers twice" taggedAs TimingTest in {
withController(1) { controller ⇒
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(A, AddressFromURIString("akka://sys"), b.ref)
controller ! nodeA
a.expectMsg(ToClient(Done))
EventFilter[DuplicateNode](occurrences = 1) intercept {
controller ! nodeB
b.expectMsg(ToClient(BarrierResult("initial startup", false)))
}
a.send(controller, EnterBarrier("bar19", None))
a.expectMsg(ToClient(BarrierResult("bar19", false)))
}
}
"fail subsequent barriers after foreced failure" taggedAs TimingTest in {
withController(2) { barrier ⇒
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
barrier ! nodeB
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar20", Option(2 seconds)))
EventFilter[FailedBarrier](occurrences = 1) intercept {
b.send(barrier, FailBarrier("bar20"))
a.expectMsg(ToClient(BarrierResult("bar20", false)))
b.expectNoMsg(1 second)
}
a.send(barrier, EnterBarrier("bar21", None))
b.send(barrier, EnterBarrier("bar21", None))
a.expectMsg(ToClient(BarrierResult("bar21", false)))
b.expectMsg(ToClient(BarrierResult("bar21", false)))
}
}
"timeout within the shortest timeout if the new timeout is shorter" taggedAs TimingTest in {
withController(3) { barrier ⇒
val a, b, c = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
barrier ! nodeA
barrier ! nodeB
barrier ! nodeC
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
c.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar22", Option(10 seconds)))
b.send(barrier, EnterBarrier("bar22", Option(2 seconds)))
EventFilter[BarrierTimeout](occurrences = 1) intercept {
Thread.sleep(4000)
}
c.send(barrier, EnterBarrier("bar22", None))
a.expectMsg(ToClient(BarrierResult("bar22", false)))
b.expectMsg(ToClient(BarrierResult("bar22", false)))
c.expectMsg(ToClient(BarrierResult("bar22", false)))
}
}
"timeout within the shortest timeout if the new timeout is longer" taggedAs TimingTest in {
withController(3) { barrier ⇒
val a, b, c = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
barrier ! nodeA
barrier ! nodeB
barrier ! nodeC
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
c.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar23", Option(2 seconds)))
b.send(barrier, EnterBarrier("bar23", Option(10 seconds)))
EventFilter[BarrierTimeout](occurrences = 1) intercept {
Thread.sleep(4000)
}
c.send(barrier, EnterBarrier("bar23", None))
a.expectMsg(ToClient(BarrierResult("bar23", false)))
b.expectMsg(ToClient(BarrierResult("bar23", false)))
c.expectMsg(ToClient(BarrierResult("bar23", false)))
}
}
"finally have no failure messages left" taggedAs TimingTest in {
expectNoMsg(1 second)
}
}
private def withController(participants: Int)(f: (ActorRef) ⇒ Unit): Unit = {
system.actorOf(Props(new Actor {
val controller = context.actorOf(Props(classOf[Controller], participants, new InetSocketAddress(InetAddress.getLocalHost, 0)))
controller ! GetSockAddr
override def supervisorStrategy = OneForOneStrategy() {
case x ⇒ testActor ! Failed(controller, x); SupervisorStrategy.Restart
}
def receive = {
case x: InetSocketAddress ⇒ testActor ! controller
}
}).withDeploy(Deploy.local))
val actor = expectMsgType[ActorRef]
f(actor)
actor ! PoisonPill // clean up so network connections don't accumulate during test run
}
/**
* Produce a BarrierCoordinator which is supervised with a strategy which
* forwards all failures to the testActor.
*/
private def getBarrier(): ActorRef = {
system.actorOf(Props(new Actor {
val barrier = context.actorOf(Props[BarrierCoordinator])
override def supervisorStrategy = OneForOneStrategy() {
case x ⇒ testActor ! Failed(barrier, x); SupervisorStrategy.Restart
}
def receive = {
case _ ⇒ sender() ! barrier
}
}).withDeploy(Deploy.local)) ! ""
expectMsgType[ActorRef]
}
private def noMsg(probes: TestProbe*) {
expectNoMsg(1 second)
probes foreach (_.msgAvailable should be(false))
}
private def data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef], previous: Data): Data = {
Data(clients, barrier, arrived, previous.deadline)
}
}
Other Akka source code examplesHere is a short list of links related to this Akka BarrierSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.