|
Akka/Scala example source code file (MessageSerializer.scala)
The MessageSerializer.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization
import scala.language.existentials
import com.google.protobuf._
import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ AtLeastOnceDeliverySnap }
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import scala.collection.immutable.VectorBuilder
/**
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
*/
trait Message extends Serializable
/**
* Protobuf serializer for [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentRepr.Undefined
val PersistentBatchClass = classOf[PersistentBatch]
val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl]
val DeliveredByTransientChannelClass = classOf[DeliveredByChannel]
val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel]
val DeliverClass = classOf[Deliver]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
def identifier: Int = 7
def includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) None
else Some(Serialization.Information(address, system))
}
/**
* Serializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates
* serialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray
case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray
case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray
case a: AtLeastOnceDeliverySnap ⇒ atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates
* deserialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match {
case None ⇒ persistent(PersistentMessage.parseFrom(bytes))
case Some(c) ⇒ c match {
case PersistentImplClass ⇒ persistent(PersistentMessage.parseFrom(bytes))
case ConfirmablePersistentImplClass ⇒ persistent(PersistentMessage.parseFrom(bytes))
case PersistentReprClass ⇒ persistent(PersistentMessage.parseFrom(bytes))
case PersistentBatchClass ⇒ persistentBatch(PersistentMessageBatch.parseFrom(bytes))
case DeliveredByTransientChannelClass ⇒ delivered(DeliveredMessage.parseFrom(bytes))
case DeliveredByPersistentChannelClass ⇒ delivered(DeliveredMessage.parseFrom(bytes))
case DeliverClass ⇒ deliver(DeliverMessage.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
//
// toBinary helpers
//
private def deliverMessageBuilder(deliver: Deliver) = {
val builder = DeliverMessage.newBuilder
builder.setPersistent(persistentMessageBuilder(deliver.persistent.asInstanceOf[PersistentRepr]))
builder.setDestination(deliver.destination.toString)
builder
}
def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnap): AtLeastOnceDeliverySnapshot.Builder = {
val builder = AtLeastOnceDeliverySnapshot.newBuilder
builder.setCurrentDeliveryId(snap.currentDeliveryId)
snap.unconfirmedDeliveries.foreach { unconfirmed ⇒
val unconfirmedBuilder =
AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder.
setDeliveryId(unconfirmed.deliveryId).
setDestination(unconfirmed.destination.toString).
setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef]))
builder.addUnconfirmedDeliveries(unconfirmedBuilder)
}
builder
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next ⇒
unconfirmedDeliveries += UnconfirmedDelivery(next.getDeliveryId, ActorPath.fromString(next.getDestination),
payload(next.getPayload))
}
AtLeastOnceDeliverySnap(
atLeastOnceDeliverySnapshot.getCurrentDeliveryId,
unconfirmedDeliveries.result())
}
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder
persistentBatch.batch.
filter(_.isInstanceOf[PersistentRepr]).
foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p.asInstanceOf[PersistentRepr])))
builder
}
private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = PersistentMessage.newBuilder
if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId)
if (persistent.confirmMessage != null) builder.setConfirmMessage(deliveredMessageBuilder(persistent.confirmMessage))
if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget))
if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender))
persistent.confirms.foreach(builder.addConfirms)
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted)
builder.setRedeliveries(persistent.redeliveries)
builder.setConfirmable(persistent.confirmable)
builder
}
private def persistentPayloadBuilder(payload: AnyRef) = {
def payloadBuilder() = {
val serializer = SerializationExtension(system).findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
builder
}
// serialize actor references with full address information (defaultAddress)
transportInformation match {
case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() }
case None ⇒ payloadBuilder()
}
}
private def deliveredMessageBuilder(delivered: Delivered) = {
val builder = DeliveredMessage.newBuilder
if (delivered.channel != null) builder.setChannel(Serialization.serializedActorPath(delivered.channel))
builder.setChannelId(delivered.channelId)
builder.setPersistentSequenceNr(delivered.persistentSequenceNr)
builder.setDeliverySequenceNr(delivered.deliverySequenceNr)
delivered match {
case c: DeliveredByChannel ⇒ builder.setPersistenceId(c.persistenceId)
case _ ⇒ builder
}
}
//
// fromBinary helpers
//
private def deliver(deliverMessage: DeliverMessage): Deliver = {
Deliver(
persistent(deliverMessage.getPersistent),
ActorPath.fromString(deliverMessage.getDestination))
}
private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch =
PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent))
private def persistent(persistentMessage: PersistentMessage): PersistentRepr = {
PersistentRepr(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
persistentMessage.getDeleted,
persistentMessage.getRedeliveries,
immutableSeq(persistentMessage.getConfirmsList),
persistentMessage.getConfirmable,
if (persistentMessage.hasConfirmMessage) delivered(persistentMessage.getConfirmMessage) else null,
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)
}
private def payload(persistentPayload: PersistentPayload): Any = {
val payloadClass = if (persistentPayload.hasPayloadManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
SerializationExtension(system).deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
payloadClass).get
}
private def delivered(deliveredMessage: DeliveredMessage): Delivered = {
val channel = if (deliveredMessage.hasChannel) system.provider.resolveActorRef(deliveredMessage.getChannel) else null
if (deliveredMessage.hasPersistenceId) {
DeliveredByChannel(
deliveredMessage.getPersistenceId,
deliveredMessage.getChannelId,
deliveredMessage.getPersistentSequenceNr,
deliveredMessage.getDeliverySequenceNr,
channel)
} else {
DeliveredByPersistentChannel(
deliveredMessage.getChannelId,
deliveredMessage.getPersistentSequenceNr,
deliveredMessage.getDeliverySequenceNr,
channel)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka MessageSerializer.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.