alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (BarrierSpec.scala)

This example Akka source code file (BarrierSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

addressfromuristring, akka, clientdisconnected, concurrent, duration, enterbarrier, eventfilter, failed, nil, nodeinfo, none, test, testing, testprobe, time, timingtest

The BarrierSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote.testconductor

import language.postfixOps

import akka.actor._
import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest }
import scala.concurrent.duration._
import akka.event.Logging
import akka.util.Timeout
import org.scalatest.BeforeAndAfterEach
import java.net.{ InetSocketAddress, InetAddress }

object BarrierSpec {
  final case class Failed(ref: ActorRef, thr: Throwable)
  val config = """
    akka.testconductor.barrier-timeout = 5s
    akka.actor.provider = akka.remote.RemoteActorRefProvider
    akka.actor.debug.fsm = on
    akka.actor.debug.lifecycle = on
               """
}

class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {

  import BarrierSpec._
  import Controller._
  import BarrierCoordinator._

  val A = RoleName("a")
  val B = RoleName("b")
  val C = RoleName("c")

  "A BarrierCoordinator" must {

    "register clients and remove them" taggedAs TimingTest in {
      val b = getBarrier()
      b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
      b ! RemoveClient(B)
      b ! RemoveClient(A)
      EventFilter[BarrierEmpty](occurrences = 1) intercept {
        b ! RemoveClient(A)
      }
      expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot remove RoleName(a): no client to remove")))
    }

    "register clients and disconnect them" taggedAs TimingTest in {
      val b = getBarrier()
      b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
      b ! ClientDisconnected(B)
      expectNoMsg(1 second)
      b ! ClientDisconnected(A)
      expectNoMsg(1 second)
    }

    "fail entering barrier when nobody registered" taggedAs TimingTest in {
      val b = getBarrier()
      b ! EnterBarrier("bar1", None)
      expectMsg(ToClient(BarrierResult("bar1", false)))
    }

    "enter barrier" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b = TestProbe()
      barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      a.send(barrier, EnterBarrier("bar2", None))
      noMsg(a, b)
      within(2 seconds) {
        b.send(barrier, EnterBarrier("bar2", None))
        a.expectMsg(ToClient(BarrierResult("bar2", true)))
        b.expectMsg(ToClient(BarrierResult("bar2", true)))
      }
    }

    "enter barrier with joining node" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b, c = TestProbe()
      barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      a.send(barrier, EnterBarrier("bar3", None))
      barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
      b.send(barrier, EnterBarrier("bar3", None))
      noMsg(a, b, c)
      within(2 seconds) {
        c.send(barrier, EnterBarrier("bar3", None))
        a.expectMsg(ToClient(BarrierResult("bar3", true)))
        b.expectMsg(ToClient(BarrierResult("bar3", true)))
        c.expectMsg(ToClient(BarrierResult("bar3", true)))
      }
    }

    "enter barrier with leaving node" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b, c = TestProbe()
      barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
      a.send(barrier, EnterBarrier("bar4", None))
      b.send(barrier, EnterBarrier("bar4", None))
      barrier ! RemoveClient(A)
      barrier ! ClientDisconnected(A)
      noMsg(a, b, c)
      b.within(2 seconds) {
        barrier ! RemoveClient(C)
        b.expectMsg(ToClient(BarrierResult("bar4", true)))
      }
      barrier ! ClientDisconnected(C)
      expectNoMsg(1 second)
    }

    "leave barrier when last “arrived” is removed" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b = TestProbe()
      barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      a.send(barrier, EnterBarrier("bar5", None))
      barrier ! RemoveClient(A)
      b.send(barrier, EnterBarrier("foo", None))
      b.expectMsg(ToClient(BarrierResult("foo", true)))
    }

    "fail barrier with disconnecing node" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b = TestProbe()
      val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      barrier ! nodeA
      barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      a.send(barrier, EnterBarrier("bar6", None))
      EventFilter[ClientLost](occurrences = 1) intercept {
        barrier ! ClientDisconnected(B)
      }
      val msg = expectMsgType[Failed]
      msg match {
        case Failed(barrier, thr: ClientLost) if (thr == ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil, thr.data.deadline), B)) ⇒
        case x ⇒ fail("Expected " + Failed(barrier, ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil, null), B)) + " but got " + x)
      }
    }

    "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b, c = TestProbe()
      val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
      barrier ! nodeA
      barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      barrier ! nodeC
      a.send(barrier, EnterBarrier("bar7", None))
      b.send(barrier, EnterBarrier("bar7", None))
      EventFilter[ClientLost](occurrences = 1) intercept {
        barrier ! ClientDisconnected(B)
      }
      val msg = expectMsgType[Failed]
      msg match {
        case Failed(barrier, thr: ClientLost) if (thr == ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil, thr.data.deadline), B)) ⇒
        case x ⇒ fail("Expected " + Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil, null), B)) + " but got " + x)
      }
    }

    "fail when entering wrong barrier" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b = TestProbe()
      val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      barrier ! nodeA
      val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      barrier ! nodeB
      a.send(barrier, EnterBarrier("bar8", None))
      EventFilter[WrongBarrier](occurrences = 1) intercept {
        b.send(barrier, EnterBarrier("foo", None))
      }
      val msg = expectMsgType[Failed]
      msg match {
        case Failed(barrier, thr: WrongBarrier) if (thr == WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil, thr.data.deadline))) ⇒
        case x ⇒ fail("Expected " + Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil, null))) + " but got " + x)
      }
    }

    "fail barrier after first failure" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a = TestProbe()
      EventFilter[BarrierEmpty](occurrences = 1) intercept {
        barrier ! RemoveClient(A)
      }
      val msg = expectMsgType[Failed]
      msg match {
        case Failed(barrier, thr: BarrierEmpty) if (thr == BarrierEmpty(Data(Set(), "", Nil, thr.data.deadline), "cannot remove RoleName(a): no client to remove")) ⇒
        case x ⇒ fail("Expected " + Failed(barrier, BarrierEmpty(Data(Set(), "", Nil, null), "cannot remove RoleName(a): no client to remove")) + " but got " + x)
      }
      barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      a.send(barrier, EnterBarrier("bar9", None))
      a.expectMsg(ToClient(BarrierResult("bar9", false)))
    }

    "fail after barrier timeout" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b = TestProbe()
      val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
      barrier ! nodeA
      barrier ! nodeB
      a.send(barrier, EnterBarrier("bar10", None))
      EventFilter[BarrierTimeout](occurrences = 1) intercept {
        val msg = expectMsgType[Failed](7 seconds)
        msg match {
          case Failed(barrier, thr: BarrierTimeout) if (thr == BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil, thr.data.deadline))) ⇒
          case x ⇒ fail("Expected " + Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil, null))) + " but got " + x)
        }
      }
    }

    "fail if a node registers twice" taggedAs TimingTest in {
      val barrier = getBarrier()
      val a, b = TestProbe()
      val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
      val nodeB = NodeInfo(A, AddressFromURIString("akka://sys"), b.ref)
      barrier ! nodeA
      EventFilter[DuplicateNode](occurrences = 1) intercept {
        barrier ! nodeB
      }
      val msg = expectMsgType[Failed]
      msg match {
        case Failed(barrier, thr: DuplicateNode) if (thr == DuplicateNode(Data(Set(nodeA), "", Nil, thr.data.deadline), nodeB)) ⇒
        case x ⇒ fail("Expected " + Failed(barrier, DuplicateNode(Data(Set(nodeA), "", Nil, null), nodeB)) + " but got " + x)
      }
    }

    "finally have no failure messages left" taggedAs TimingTest in {
      expectNoMsg(1 second)
    }

  }

  "A Controller with BarrierCoordinator" must {

    "register clients and remove them" taggedAs TimingTest in {
      withController(1) { b ⇒
        b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
        expectMsg(ToClient(Done))
        b ! Remove(B)
        b ! Remove(A)
        EventFilter.warning(start = "cannot remove", occurrences = 1) intercept {
          b ! Remove(A)
        }
        Thread.sleep(5000)
      }
    }

    "register clients and disconnect them" taggedAs TimingTest in {
      withController(1) { b ⇒
        b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
        expectMsg(ToClient(Done))
        b ! ClientDisconnected(B)
        expectNoMsg(1 second)
        b ! ClientDisconnected(A)
        expectNoMsg(1 second)
      }
    }

    "fail entering barrier when nobody registered" taggedAs TimingTest in {
      withController(0) { b ⇒
        b ! EnterBarrier("b", None)
        expectMsg(ToClient(BarrierResult("b", false)))
      }
    }

    "enter barrier" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b = TestProbe()
        barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar11", None))
        noMsg(a, b)
        within(2 seconds) {
          b.send(barrier, EnterBarrier("bar11", None))
          a.expectMsg(ToClient(BarrierResult("bar11", true)))
          b.expectMsg(ToClient(BarrierResult("bar11", true)))
        }
      }
    }

    "enter barrier with joining node" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b, c = TestProbe()
        barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar12", None))
        barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
        c.expectMsg(ToClient(Done))
        b.send(barrier, EnterBarrier("bar12", None))
        noMsg(a, b, c)
        within(2 seconds) {
          c.send(barrier, EnterBarrier("bar12", None))
          a.expectMsg(ToClient(BarrierResult("bar12", true)))
          b.expectMsg(ToClient(BarrierResult("bar12", true)))
          c.expectMsg(ToClient(BarrierResult("bar12", true)))
        }
      }
    }

    "enter barrier with leaving node" taggedAs TimingTest in {
      withController(3) { barrier ⇒
        val a, b, c = TestProbe()
        barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        c.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar13", None))
        b.send(barrier, EnterBarrier("bar13", None))
        barrier ! Remove(A)
        barrier ! ClientDisconnected(A)
        noMsg(a, b, c)
        b.within(2 seconds) {
          barrier ! Remove(C)
          b.expectMsg(ToClient(BarrierResult("bar13", true)))
        }
        barrier ! ClientDisconnected(C)
        expectNoMsg(1 second)
      }
    }

    "leave barrier when last “arrived” is removed" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b = TestProbe()
        barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar14", None))
        barrier ! Remove(A)
        b.send(barrier, EnterBarrier("foo", None))
        b.expectMsg(ToClient(BarrierResult("foo", true)))
      }
    }

    "fail barrier with disconnecing node" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        barrier ! nodeA
        barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar15", None))
        barrier ! ClientDisconnected(RoleName("unknown"))
        noMsg(a)
        EventFilter[ClientLost](occurrences = 1) intercept {
          barrier ! ClientDisconnected(B)
        }
        a.expectMsg(ToClient(BarrierResult("bar15", false)))
      }
    }

    "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
      withController(3) { barrier ⇒
        val a, b, c = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
        barrier ! nodeA
        barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        barrier ! nodeC
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        c.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar16", None))
        b.send(barrier, EnterBarrier("bar16", None))
        EventFilter[ClientLost](occurrences = 1) intercept {
          barrier ! ClientDisconnected(B)
        }
        a.expectMsg(ToClient(BarrierResult("bar16", false)))
      }
    }

    "fail when entering wrong barrier" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        barrier ! nodeA
        val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        barrier ! nodeB
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar17", None))
        EventFilter[WrongBarrier](occurrences = 1) intercept {
          b.send(barrier, EnterBarrier("foo", None))
        }
        a.expectMsg(ToClient(BarrierResult("bar17", false)))
        b.expectMsg(ToClient(BarrierResult("foo", false)))
      }
    }

    "fail after barrier timeout" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        barrier ! nodeA
        barrier ! nodeB
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar18", Option(2 seconds)))
        EventFilter[BarrierTimeout](occurrences = 1) intercept {
          Thread.sleep(4000)
        }
        b.send(barrier, EnterBarrier("bar18", None))
        a.expectMsg(ToClient(BarrierResult("bar18", false)))
        b.expectMsg(ToClient(BarrierResult("bar18", false)))
      }
    }

    "fail if a node registers twice" taggedAs TimingTest in {
      withController(2) { controller ⇒
        val a, b = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeB = NodeInfo(A, AddressFromURIString("akka://sys"), b.ref)
        controller ! nodeA
        EventFilter[DuplicateNode](occurrences = 1) intercept {
          controller ! nodeB
        }
        a.expectMsg(ToClient(BarrierResult("initial startup", false)))
        b.expectMsg(ToClient(BarrierResult("initial startup", false)))
      }
    }

    "fail subsequent barriers if a node registers twice" taggedAs TimingTest in {
      withController(1) { controller ⇒
        val a, b = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeB = NodeInfo(A, AddressFromURIString("akka://sys"), b.ref)
        controller ! nodeA
        a.expectMsg(ToClient(Done))
        EventFilter[DuplicateNode](occurrences = 1) intercept {
          controller ! nodeB
          b.expectMsg(ToClient(BarrierResult("initial startup", false)))
        }
        a.send(controller, EnterBarrier("bar19", None))
        a.expectMsg(ToClient(BarrierResult("bar19", false)))
      }
    }

    "fail subsequent barriers after foreced failure" taggedAs TimingTest in {
      withController(2) { barrier ⇒
        val a, b = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        barrier ! nodeA
        barrier ! nodeB
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar20", Option(2 seconds)))
        EventFilter[FailedBarrier](occurrences = 1) intercept {
          b.send(barrier, FailBarrier("bar20"))
          a.expectMsg(ToClient(BarrierResult("bar20", false)))
          b.expectNoMsg(1 second)
        }
        a.send(barrier, EnterBarrier("bar21", None))
        b.send(barrier, EnterBarrier("bar21", None))
        a.expectMsg(ToClient(BarrierResult("bar21", false)))
        b.expectMsg(ToClient(BarrierResult("bar21", false)))
      }
    }

    "timeout within the shortest timeout if the new timeout is shorter" taggedAs TimingTest in {
      withController(3) { barrier ⇒
        val a, b, c = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
        barrier ! nodeA
        barrier ! nodeB
        barrier ! nodeC
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        c.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar22", Option(10 seconds)))
        b.send(barrier, EnterBarrier("bar22", Option(2 seconds)))
        EventFilter[BarrierTimeout](occurrences = 1) intercept {
          Thread.sleep(4000)
        }
        c.send(barrier, EnterBarrier("bar22", None))
        a.expectMsg(ToClient(BarrierResult("bar22", false)))
        b.expectMsg(ToClient(BarrierResult("bar22", false)))
        c.expectMsg(ToClient(BarrierResult("bar22", false)))
      }
    }

    "timeout within the shortest timeout if the new timeout is longer" taggedAs TimingTest in {
      withController(3) { barrier ⇒
        val a, b, c = TestProbe()
        val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
        val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
        val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
        barrier ! nodeA
        barrier ! nodeB
        barrier ! nodeC
        a.expectMsg(ToClient(Done))
        b.expectMsg(ToClient(Done))
        c.expectMsg(ToClient(Done))
        a.send(barrier, EnterBarrier("bar23", Option(2 seconds)))
        b.send(barrier, EnterBarrier("bar23", Option(10 seconds)))
        EventFilter[BarrierTimeout](occurrences = 1) intercept {
          Thread.sleep(4000)
        }
        c.send(barrier, EnterBarrier("bar23", None))
        a.expectMsg(ToClient(BarrierResult("bar23", false)))
        b.expectMsg(ToClient(BarrierResult("bar23", false)))
        c.expectMsg(ToClient(BarrierResult("bar23", false)))
      }
    }

    "finally have no failure messages left" taggedAs TimingTest in {
      expectNoMsg(1 second)
    }

  }

  private def withController(participants: Int)(f: (ActorRef) ⇒ Unit): Unit = {
    system.actorOf(Props(new Actor {
      val controller = context.actorOf(Props(classOf[Controller], participants, new InetSocketAddress(InetAddress.getLocalHost, 0)))
      controller ! GetSockAddr
      override def supervisorStrategy = OneForOneStrategy() {
        case x ⇒ testActor ! Failed(controller, x); SupervisorStrategy.Restart
      }
      def receive = {
        case x: InetSocketAddress ⇒ testActor ! controller
      }
    }).withDeploy(Deploy.local))
    val actor = expectMsgType[ActorRef]
    f(actor)
    actor ! PoisonPill // clean up so network connections don't accumulate during test run
  }

  /**
   * Produce a BarrierCoordinator which is supervised with a strategy which
   * forwards all failures to the testActor.
   */
  private def getBarrier(): ActorRef = {
    system.actorOf(Props(new Actor {
      val barrier = context.actorOf(Props[BarrierCoordinator])
      override def supervisorStrategy = OneForOneStrategy() {
        case x ⇒ testActor ! Failed(barrier, x); SupervisorStrategy.Restart
      }
      def receive = {
        case _ ⇒ sender() ! barrier
      }
    }).withDeploy(Deploy.local)) ! ""
    expectMsgType[ActorRef]
  }

  private def noMsg(probes: TestProbe*) {
    expectNoMsg(1 second)
    probes foreach (_.msgAvailable should be(false))
  }

  private def data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef], previous: Data): Data = {
    Data(clients, barrier, arrived, previous.deadline)
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka BarrierSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.