|
Akka/Scala example source code file (ConcurrentSocketActorSpec.scala)
The ConcurrentSocketActorSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import language.postfixOps
import org.scalatest.Matchers
import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec }
import scala.concurrent.duration._
import akka.actor.{ Cancellable, Actor, Props, ActorRef }
import akka.util.{ ByteString, Timeout }
class ConcurrentSocketActorSpec extends AkkaSpec {
implicit val timeout: Timeout = Timeout(15 seconds)
def checkZeroMQInstallation() =
try {
zmq.version match {
case ZeroMQVersion(x, y, _) if x >= 3 || (x >= 2 && y >= 1) ⇒ Unit
case version ⇒ invalidZeroMQVersion(version)
}
} catch {
case e: LinkageError ⇒ zeroMQNotInstalled()
}
def invalidZeroMQVersion(version: ZeroMQVersion) {
info("WARNING: The tests are not run because invalid ZeroMQ version: %s. Version >= 2.1.x required.".format(version))
pending
}
def zeroMQNotInstalled(): Unit = {
info("WARNING: The tests are not run because ZeroMQ is not installed. Version >= 2.1.x required.")
pending
}
lazy val endpoints: Vector[String] = {
val sockets = Vector.fill(3)(new java.net.ServerSocket(0))
val endpoints = sockets.map(s ⇒ s"tcp://127.0.0.1:${s.getLocalPort}")
sockets.foreach(_.close())
endpoints
}
// this must stay a def for checkZeroMQInstallation() to work correctly
def zmq = ZeroMQExtension(system)
"ConcurrentSocketActor" should {
"support pub-sub connections" in {
checkZeroMQInstallation()
val subscriberProbe = TestProbe()
val context = Context()
val endpoint = endpoints(0)
val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint))
val subscriber = zmq.newSocket(SocketType.Sub, context, Listener(subscriberProbe.ref), Connect(endpoint), SubscribeAll)
import system.dispatcher
val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable {
var number = 0
def run() {
publisher ! ZMQMessage(ByteString(number.toString), ByteString.empty)
number += 1
}
})
try {
subscriberProbe.expectMsg(Connecting)
val msgNumbers = subscriberProbe.receiveWhile(3 seconds) {
case msg: ZMQMessage if msg.frames.size == 2 ⇒
msg.frames(1).length should be(0)
msg
}.map(m ⇒ m.frames(0).utf8String.toInt)
msgNumbers.length should be > 0
msgNumbers should be(for (i ← msgNumbers.head to msgNumbers.last) yield i)
} finally {
msgGenerator.cancel()
watch(subscriber)
system stop subscriber
subscriberProbe.receiveWhile(3 seconds) {
case msg ⇒ msg
}.last should be(Closed)
expectTerminated(subscriber, 5.seconds)
watch(publisher)
system stop publisher
expectTerminated(publisher, 5.seconds)
context.term()
}
}
"support req-rep connections" in {
checkZeroMQInstallation()
val requesterProbe = TestProbe()
val replierProbe = TestProbe()
val context = Context()
val endpoint = endpoints(1)
val requester = zmq.newSocket(SocketType.Req, context, Listener(requesterProbe.ref), Bind(endpoint))
val replier = zmq.newSocket(SocketType.Rep, context, Listener(replierProbe.ref), Connect(endpoint))
try {
replierProbe.expectMsg(Connecting)
val request = ZMQMessage(ByteString("Request"))
val reply = ZMQMessage(ByteString("Reply"))
requester ! request
replierProbe.expectMsg(request)
replier ! reply
requesterProbe.expectMsg(reply)
} finally {
watch(replier)
system stop replier
replierProbe.expectMsg(Closed)
expectTerminated(replier, 5.seconds)
watch(requester)
system stop requester
expectTerminated(requester, 5.seconds)
context.term()
}
}
"should support push-pull connections" in {
checkZeroMQInstallation()
val pullerProbe = TestProbe()
val context = Context()
val endpoint = endpoints(2)
val pusher = zmq.newSocket(SocketType.Push, context, Bind(endpoint))
val puller = zmq.newSocket(SocketType.Pull, context, Listener(pullerProbe.ref), Connect(endpoint))
try {
pullerProbe.expectMsg(Connecting)
val message = ZMQMessage(ByteString("Pushed message"))
pusher ! message
pullerProbe.expectMsg(message)
} finally {
watch(puller)
system stop puller
pullerProbe.expectMsg(Closed)
expectTerminated(puller, 5.seconds)
watch(pusher)
system stop pusher
expectTerminated(pusher, 5.seconds)
context.term()
}
}
}
class MessageGeneratorActor(actorRef: ActorRef) extends Actor {
var messageNumber: Int = 0
var genMessages: Cancellable = null
override def preStart() = {
import system.dispatcher
genMessages = system.scheduler.schedule(100 millis, 10 millis, self, "genMessage")
}
override def postStop() = {
if (genMessages != null && !genMessages.isCancelled) {
genMessages.cancel
genMessages = null
}
}
def receive = {
case _ ⇒
val payload = "%s".format(messageNumber)
messageNumber += 1
actorRef ! ZMQMessage(ByteString(payload))
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ConcurrentSocketActorSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.