|
Akka/Scala example source code file (ClusterConsistentHashingGroupSpec.scala)
The ClusterConsistentHashingGroupSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster.routing import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Address import akka.actor.Props import akka.cluster.MultiNodeClusterSpec import akka.pattern.ask import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.routing.Broadcast import akka.routing.ConsistentHashingGroup import akka.routing.ConsistentHashingRouter.ConsistentHashMapping import akka.routing.GetRoutees import akka.routing.Routees import akka.testkit._ object ClusterConsistentHashingGroupMultiJvmSpec extends MultiNodeConfig { case object Get final case class Collected(messages: Set[Any]) class Destination extends Actor { var receivedMessages = Set.empty[Any] def receive = { case Get ⇒ sender() ! Collected(receivedMessages) case m ⇒ receivedMessages += m } } val first = role("first") val second = role("second") val third = role("third") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClusterConsistentHashingGroupMultiJvmNode1 extends ClusterConsistentHashingGroupSpec class ClusterConsistentHashingGroupMultiJvmNode2 extends ClusterConsistentHashingGroupSpec class ClusterConsistentHashingGroupMultiJvmNode3 extends ClusterConsistentHashingGroupSpec abstract class ClusterConsistentHashingGroupSpec extends MultiNodeSpec(ClusterConsistentHashingGroupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with DefaultTimeout { import ClusterConsistentHashingGroupMultiJvmSpec._ /** * Fills in self address for local ActorRef */ private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { case Address(_, _, None, None) ⇒ cluster.selfAddress case a ⇒ a } def currentRoutees(router: ActorRef) = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees "A cluster router with a consistent hashing group" must { "start cluster with 3 nodes" taggedAs LongRunningTest in { system.actorOf(Props[Destination], "dest") awaitClusterUp(first, second, third) enterBarrier("after-1") } "send to same destinations from different nodes" taggedAs LongRunningTest in { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s } val paths = List("/user/dest") val router = system.actorOf(ClusterRouterGroup(local = ConsistentHashingGroup(paths, hashMapping = hashMapping), settings = ClusterRouterGroupSettings(totalInstances = 10, paths, allowLocalRoutees = true, useRole = None)).props(), "router") // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size should be(3) } val keys = List("A", "B", "C", "D", "E", "F", "G") for (_ ← 1 to 10; k ← keys) { router ! k } enterBarrier("messages-sent") router ! Broadcast(Get) val a = expectMsgType[Collected].messages val b = expectMsgType[Collected].messages val c = expectMsgType[Collected].messages a.intersect(b) should be(Set.empty) a.intersect(c) should be(Set.empty) b.intersect(c) should be(Set.empty) (a.size + b.size + c.size) should be(keys.size) enterBarrier("after-2") } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterConsistentHashingGroupSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.