 * Copyright (C) 2009-2014 Typesafe Inc. <>

package akka.event

import language.postfixOps

import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import scala.concurrent.duration._
import java.util.concurrent.atomic._

import{ Props, Actor, ActorRef, ActorSystem, PoisonPill, RootActorPath }
import akka.japi.{ Procedure, Function }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await

object EventBusSpec {
  class TestActorWrapperActor(testActor: ActorRef) extends Actor {
    def receive = {
      case x ⇒ testActor forward x

abstract class EventBusSpec(busName: String, conf: Config = ConfigFactory.empty()) extends AkkaSpec(conf) with BeforeAndAfterEach {
  import EventBusSpec._
  type BusType <: EventBus

  def createNewEventBus(): BusType

  def createEvents(numberOfEvents: Int): Iterable[BusType#Event]

  def createSubscriber(pipeTo: ActorRef): BusType#Subscriber

  def classifierFor(event: BusType#Event): BusType#Classifier

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit

  lazy val bus = createNewEventBus()

  busName must {
    def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber]
    def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier]
    def createNewEvents(numberOfEvents: Int): Iterable[bus.Event] = createEvents(numberOfEvents).asInstanceOf[Iterable[bus.Event]]

    val events = createNewEvents(100)
    val event = events.head
    val classifier = getClassifierFor(event)
    val subscriber = createNewSubscriber()

    "allow subscribers" in {
      bus.subscribe(subscriber, classifier) should be(true)

    "allow to unsubscribe already existing subscriber" in {
      bus.unsubscribe(subscriber, classifier) should be(true)

    "not allow to unsubscribe non-existing subscriber" in {
      val sub = createNewSubscriber()
      bus.unsubscribe(sub, classifier) should be(false)
      disposeSubscriber(system, sub)

    "not allow for the same subscriber to subscribe to the same channel twice" in {
      bus.subscribe(subscriber, classifier) should be(true)
      bus.subscribe(subscriber, classifier) should be(false)
      bus.unsubscribe(subscriber, classifier) should be(true)

    "not allow for the same subscriber to unsubscribe to the same channel twice" in {
      bus.subscribe(subscriber, classifier) should be(true)
      bus.unsubscribe(subscriber, classifier) should be(true)
      bus.unsubscribe(subscriber, classifier) should be(false)

    "allow to add multiple subscribers" in {
      val subscribers = (1 to 10) map { _ ⇒ createNewSubscriber() }
      val events = createEvents(10)
      val classifiers = events map getClassifierFor forall { case (s, c) ⇒ bus.subscribe(s, c) } should be(true) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } should be(true)

      subscribers foreach (disposeSubscriber(system, _))

    "publishing events without any subscribers shouldn't be a problem" in {

    "publish the given event to the only subscriber" in {
      bus.subscribe(subscriber, classifier)
      expectNoMsg(1 second)
      bus.unsubscribe(subscriber, classifier)

    "publish to the only subscriber multiple times" in {
      bus.subscribe(subscriber, classifier)
      expectNoMsg(1 second)
      bus.unsubscribe(subscriber, classifier)

    "publish the given event to all intended subscribers" in {
      val range = 0 until 10
      val subscribers = range map (_ ⇒ createNewSubscriber())
      subscribers foreach { s ⇒ bus.subscribe(s, classifier) should be(true) }
      range foreach { _ ⇒ expectMsg(event) }
      subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) should be(true); disposeSubscriber(system, s) }

    "not publish the given event to any other subscribers than the intended ones" in {
      val otherSubscriber = createNewSubscriber()
      val otherClassifier = getClassifierFor(events.drop(1).head)
      bus.subscribe(subscriber, classifier)
      bus.subscribe(otherSubscriber, otherClassifier)
      bus.unsubscribe(subscriber, classifier)
      bus.unsubscribe(otherSubscriber, otherClassifier)
      expectNoMsg(1 second)

    "not publish the given event to a former subscriber" in {
      bus.subscribe(subscriber, classifier)
      bus.unsubscribe(subscriber, classifier)
      expectNoMsg(1 second)

    "cleanup subscriber" in {
      disposeSubscriber(system, subscriber)

object ActorEventBusSpec {
  class MyActorEventBus(protected val system: ActorSystem) extends ActorEventBus
    with ActorClassification with ActorClassifier {

    type Event = Notification

    def classify(event: Event) = event.ref
    protected def mapSize = 32
    def publish(event: Event, subscriber: Subscriber) = subscriber ! event

  case class Notification(ref: ActorRef, payload: Int)

class ActorEventBusSpec(conf: Config) extends EventBusSpec("ActorEventBus", conf) {
  import akka.event.ActorEventBusSpec._
  import EventBusSpec.TestActorWrapperActor

  def this() {
    this(ConfigFactory.parseString(" = on").withFallback(AkkaSpec.testConf))

  type BusType = MyActorEventBus
  def createNewEventBus(): BusType = new MyActorEventBus(system)

  // different actor in each event because we want each event to have a different classifier (see EventBusSpec tests)
  def createEvents(numberOfEvents: Int) = (0 until numberOfEvents).map(Notification(TestProbe().ref, _)).toSeq

  def createSubscriber(pipeTo: ActorRef) = system.actorOf(Props(new TestActorWrapperActor(pipeTo)))

  def classifierFor(event: BusType#Event) = event.ref

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber)

  // ActorClassification specific tests

  "must unsubscribe subscriber when it terminates" in {
    val a1 = createSubscriber(system.deadLetters)
    val subs = createSubscriber(testActor)
    def m(i: Int) = Notification(a1, i)
    val p = TestProbe()
    system.eventStream.subscribe(p.ref, classOf[Logging.Debug])

    bus.subscribe(subs, a1)

    subs ! PoisonPill // when a1 dies, subs has nothing subscribed
    expectUnsubscribedByUnsubscriber(p, subs)

    expectNoMsg(1 second)

    disposeSubscriber(system, subs)
    disposeSubscriber(system, a1)

  "must keep subscriber even if its subscription-actors have died" in {
    // Deaths of monitored actors should not influence the subscription.
    // For example: one might still want to monitor messages classified to A
    // even though it died, and handle these in some way.
    val a1 = createSubscriber(system.deadLetters)
    val subs = createSubscriber(testActor)
    def m(i: Int) = Notification(a1, i)

    bus.subscribe(subs, a1) should equal(true)


    a1 ! PoisonPill

    bus.publish(m(2)) // even though a1 has terminated, classification still applies

    disposeSubscriber(system, subs)
    disposeSubscriber(system, a1)

  "must unregister subscriber only after it unsubscribes from all of it's subscriptions" in {
    val a1, a2 = createSubscriber(system.deadLetters)
    val subs = createSubscriber(testActor)
    def m1(i: Int) = Notification(a1, i)
    def m2(i: Int) = Notification(a2, i)

    val p = TestProbe()
    system.eventStream.subscribe(p.ref, classOf[Logging.Debug])

    bus.subscribe(subs, a1) should equal(true)
    bus.subscribe(subs, a2) should equal(true)


    bus.unsubscribe(subs, a1)
    expectNoMsg(1 second)

    bus.unsubscribe(subs, a2)
    expectUnregisterFromUnsubscriber(p, subs)
    expectNoMsg(1 second)

    disposeSubscriber(system, subs)
    disposeSubscriber(system, a1)
    disposeSubscriber(system, a2)

  private def expectUnsubscribedByUnsubscriber(p: TestProbe, a: ActorRef) {
    val expectedMsg = s"actor $a has terminated, unsubscribing it from $bus"
    p.fishForMessage(1 second, hint = expectedMsg) {
      case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true
      case other ⇒ false

  private def expectUnregisterFromUnsubscriber(p: TestProbe, a: ActorRef) {
    val expectedMsg = s"unregistered watch of $a in $bus"
    p.fishForMessage(1 second, hint = expectedMsg) {
      case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true
      case other ⇒ false

object ScanningEventBusSpec {

  class MyScanningEventBus extends EventBus with ScanningClassification {
    type Event = Int
    type Subscriber = Procedure[Int]
    type Classifier = String

    protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
    protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b)

    protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier

    protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)

class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
  import ScanningEventBusSpec._

  type BusType = MyScanningEventBus

  def createNewEventBus(): BusType = new MyScanningEventBus

  def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)

  def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }

  def classifierFor(event: BusType#Event) = event.toString

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()

object LookupEventBusSpec {
  class MyLookupEventBus extends EventBus with LookupClassification {
    type Event = Int
    type Subscriber = Procedure[Int]
    type Classifier = String

    override protected def classify(event: Int): String = event.toString
    override protected def compareSubscribers(a: Procedure[Int], b: Procedure[Int]): Int =
      akka.util.Helpers.compareIdentityHash(a, b)
    override protected def mapSize = 32
    override protected def publish(event: Int, subscriber: Procedure[Int]): Unit =

class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
  import LookupEventBusSpec._

  type BusType = MyLookupEventBus

  def createNewEventBus(): BusType = new MyLookupEventBus

  def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)

  def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }

  def classifierFor(event: BusType#Event) = event.toString

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()

