|
Akka/Scala example source code file (AkkaProtocolSpec.scala)
The AkkaProtocolSpec.scala Akka example source codepackage akka.remote.transport import akka.actor.{ ExtendedActorSystem, Address, Props } import akka.remote.transport.AkkaPduCodec.{ Disassociate, Associate, Heartbeat } import akka.remote.transport.AkkaProtocolSpec.TestFailureDetector import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload } import akka.remote.transport.TestTransport._ import akka.remote.transport.Transport._ import akka.remote.{ SeqNo, WireFormats, RemoteActorRefProvider, FailureDetector } import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.util.ByteString import com.google.protobuf.{ ByteString ⇒ PByteString } import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.{ Await, Promise } import akka.actor.Deploy object AkkaProtocolSpec { class TestFailureDetector extends FailureDetector { @volatile var isAvailable: Boolean = true def isMonitoring: Boolean = called @volatile var called: Boolean = false def heartbeat(): Unit = called = true } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """) with ImplicitSender { val conf = ConfigFactory.parseString( """ akka.remote { transport-failure-detector { implementation-class = "akka.remote.PhiAccrualFailureDetector" threshold = 7.0 max-sample-size = 100 min-std-deviation = 100 ms acceptable-heartbeat-pause = 3 s heartbeat-interval = 1 s } backoff-interval = 1 s require-cookie = off secure-cookie = "abcde" shutdown-timeout = 5 s startup-timeout = 5 s use-passive-connections = on } """) val localAddress = Address("test", "testsystem", "testhost", 1234) val localAkkaAddress = Address("akka.test", "testsystem", "testhost", 1234) val remoteAddress = Address("test", "testsystem2", "testhost2", 1234) val remoteAkkaAddress = Address("akka.test", "testsystem2", "testhost2", 1234) val codec = AkkaPduProtobufCodec val testMsg = WireFormats.SerializedMessage.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, None) val testMsgPdu: ByteString = codec.constructPayload(testEnvelope) def testHeartbeat = InboundPayload(codec.constructHeartbeat) def testPayload = InboundPayload(testMsgPdu) def testDisassociate(info: DisassociateInfo) = InboundPayload(codec.constructDisassociate(info)) def testAssociate(uid: Int, cookie: Option[String]) = InboundPayload(codec.constructAssociate(HandshakeInfo(remoteAkkaAddress, uid, cookie))) def collaborators = { val registry = new AssociationRegistry val transport: TestTransport = new TestTransport(localAddress, registry) val handle: TestAssociationHandle = new TestAssociationHandle(localAddress, remoteAddress, transport, true) // silently drop writes -- we do not have another endpoint under test, so nobody to forward to transport.writeBehavior.pushConstant(true) (new TestFailureDetector, registry, transport, handle) } def lastActivityIsHeartbeat(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ codec.decodePdu(payload) match { case Heartbeat ⇒ true case _ ⇒ false } case _ ⇒ false } def lastActivityIsAssociate(registry: AssociationRegistry, uid: Long, cookie: Option[String]) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ codec.decodePdu(payload) match { case Associate(info) ⇒ info.cookie == cookie && info.origin == localAddress && info.uid == uid case _ ⇒ false } case _ ⇒ false } def lastActivityIsDisassociate(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ codec.decodePdu(payload) match { case Disassociate(_) ⇒ true case _ ⇒ false } case _ ⇒ false } "ProtocolStateActor" must { "register itself as reader on injecteted handles" in { val (failureDetector, _, _, handle) = collaborators system.actorOf(ProtocolStateActor.inboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), handle, ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, failureDetector)) awaitCond(handle.readHandlerPromise.isCompleted) } "in inbound mode accept payload after Associate PDU received" in { val (failureDetector, registry, _, handle) = collaborators val reader = system.actorOf(ProtocolStateActor.inboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), handle, ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, failureDetector)) reader ! testAssociate(uid = 33, cookie = None) awaitCond(failureDetector.called) val wrappedHandle = expectMsgPF() { case InboundAssociation(h: AkkaProtocolHandle) ⇒ h.handshakeInfo.uid should be(33) h } wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) failureDetector.called should be(true) // Heartbeat was sent in response to Associate awaitCond(lastActivityIsHeartbeat(registry)) reader ! testPayload expectMsgPF() { case InboundPayload(p) ⇒ p should be(testEnvelope) } } "in inbound mode disassociate when an unexpected message arrives instead of Associate" in { val (failureDetector, registry, _, handle) = collaborators val reader = system.actorOf(ProtocolStateActor.inboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), handle, ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, failureDetector)) // a stray message will force a disassociate reader ! testHeartbeat // this associate will now be ignored reader ! testAssociate(uid = 33, cookie = None) awaitCond(registry.logSnapshot.exists { case DisassociateAttempt(requester, remote) ⇒ true case _ ⇒ false }) } "in outbound mode delay readiness until hadnshake finished" in { val (failureDetector, registry, transport, handle) = collaborators transport.associateBehavior.pushConstant(handle) val statusPromise: Promise[AssociationHandle] = Promise() val reader = system.actorOf(ProtocolStateActor.outboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), remoteAddress, statusPromise, transport, new AkkaProtocolSettings(conf), codec, failureDetector, refuseUid = None)) awaitCond(lastActivityIsAssociate(registry, 42, None)) failureDetector.called should be(true) // keeps sending heartbeats awaitCond(lastActivityIsHeartbeat(registry)) statusPromise.isCompleted should be(false) // finish connection by sending back an associate message reader ! testAssociate(33, None) Await.result(statusPromise.future, 3.seconds) match { case h: AkkaProtocolHandle ⇒ h.remoteAddress should be(remoteAkkaAddress) h.localAddress should be(localAkkaAddress) h.handshakeInfo.uid should be(33) case _ ⇒ fail() } } "ignore incoming associations with wrong cookie" in { val (failureDetector, registry, _, handle) = collaborators val reader = system.actorOf(ProtocolStateActor.inboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = Some("abcde")), handle, ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, failureDetector)) reader ! testAssociate(uid = 33, Some("xyzzy")) awaitCond(registry.logSnapshot.exists { case DisassociateAttempt(requester, remote) ⇒ true case _ ⇒ false }) } "accept incoming associations with correct cookie" in { val (failureDetector, registry, _, handle) = collaborators val reader = system.actorOf(ProtocolStateActor.inboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = Some("abcde")), handle, ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, failureDetector)) // Send the correct cookie reader ! testAssociate(uid = 33, Some("abcde")) val wrappedHandle = expectMsgPF() { case InboundAssociation(h: AkkaProtocolHandle) ⇒ h.handshakeInfo.uid should be(33) h.handshakeInfo.cookie should be(Some("abcde")) h } wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) failureDetector.called should be(true) // Heartbeat was sent in response to Associate awaitCond(lastActivityIsHeartbeat(registry)) } "send cookie in Associate PDU if configured to do so" in { val (failureDetector, registry, transport, handle) = collaborators transport.associateBehavior.pushConstant(handle) val statusPromise: Promise[AssociationHandle] = Promise() system.actorOf(ProtocolStateActor.outboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = Some("abcde")), remoteAddress, statusPromise, transport, new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, failureDetector, refuseUid = None)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = Some("abcde"))) } "handle explicit disassociate messages" in { val (failureDetector, registry, transport, handle) = collaborators transport.associateBehavior.pushConstant(handle) val statusPromise: Promise[AssociationHandle] = Promise() val reader = system.actorOf(ProtocolStateActor.outboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), remoteAddress, statusPromise, transport, new AkkaProtocolSettings(conf), codec, failureDetector, refuseUid = None)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) reader ! testAssociate(uid = 33, cookie = None) val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { case h: AssociationHandle ⇒ h.remoteAddress should be(remoteAkkaAddress) h.localAddress should be(localAkkaAddress) h case _ ⇒ fail() } wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) reader ! testDisassociate(AssociationHandle.Unknown) expectMsg(Disassociated(AssociationHandle.Unknown)) } "handle transport level disassociations" in { val (failureDetector, registry, transport, handle) = collaborators transport.associateBehavior.pushConstant(handle) val statusPromise: Promise[AssociationHandle] = Promise() val reader = system.actorOf(ProtocolStateActor.outboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), remoteAddress, statusPromise, transport, new AkkaProtocolSettings(conf), codec, failureDetector, refuseUid = None)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) reader ! testAssociate(uid = 33, cookie = None) val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { case h: AssociationHandle ⇒ h.remoteAddress should be(remoteAkkaAddress) h.localAddress should be(localAkkaAddress) h case _ ⇒ fail() } wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) reader ! Disassociated(AssociationHandle.Unknown) expectMsg(Disassociated(AssociationHandle.Unknown)) } "disassociate when failure detector signals failure" in { val (failureDetector, registry, transport, handle) = collaborators transport.associateBehavior.pushConstant(handle) val statusPromise: Promise[AssociationHandle] = Promise() val stateActor = system.actorOf(ProtocolStateActor.outboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), remoteAddress, statusPromise, transport, new AkkaProtocolSettings(conf), codec, failureDetector, refuseUid = None)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) stateActor ! testAssociate(uid = 33, cookie = None) val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { case h: AssociationHandle ⇒ h.remoteAddress should be(remoteAkkaAddress) h.localAddress should be(localAkkaAddress) h case _ ⇒ fail() } wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) //wait for one heartbeat awaitCond(lastActivityIsHeartbeat(registry)) failureDetector.isAvailable = false expectMsg(Disassociated(AssociationHandle.Unknown)) } "handle correctly when the handler is registered only after the association is already closed" in { val (failureDetector, registry, transport, handle) = collaborators transport.associateBehavior.pushConstant(handle) val statusPromise: Promise[AssociationHandle] = Promise() val stateActor = system.actorOf(ProtocolStateActor.outboundProps( HandshakeInfo(origin = localAddress, uid = 42, cookie = None), remoteAddress, statusPromise, transport, new AkkaProtocolSettings(conf), codec, failureDetector, refuseUid = None)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) stateActor ! testAssociate(uid = 33, cookie = None) val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { case h: AssociationHandle ⇒ h.remoteAddress should be(remoteAkkaAddress) h.localAddress should be(localAkkaAddress) h case _ ⇒ fail() } stateActor ! Disassociated(AssociationHandle.Unknown) wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) expectMsg(Disassociated(AssociationHandle.Unknown)) } } } Other Akka source code examplesHere is a short list of links related to this Akka AkkaProtocolSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.