alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (SnapshotSerializer.scala)

This example Akka source code file (SnapshotSerializer.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, anyref, array, illegalargumentexception, inputstream, int, none, option, persistence, serialization, serializationextension, serialize, serialversionuid, snapshot

The SnapshotSerializer.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 * Copyright (C) 2012-2013 Eligotech BV.
 */

package akka.persistence.serialization

import java.io._
import akka.actor._
import akka.serialization.{ Serializer, SerializationExtension }
import akka.serialization.Serialization

/**
 * Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by
 * a [[Processor]].
 *
 * @see [[SnapshotSerializer]]
 */
@SerialVersionUID(1L)
final case class Snapshot(data: Any)

/**
 * INTERNAL API.
 */
@SerialVersionUID(1L)
private[serialization] final case class SnapshotHeader(serializerId: Int, manifest: Option[String])

/**
 * [[Snapshot]] serializer.
 */
class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
  def identifier: Int = 8
  def includeManifest: Boolean = false

  private lazy val transportInformation: Option[Serialization.Information] = {
    val address = system.provider.getDefaultAddress
    if (address.hasLocalScope) None
    else Some(Serialization.Information(address, system))
  }

  /**
   * Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching
   * `akka.serialization.Serializer`.
   */
  def toBinary(o: AnyRef): Array[Byte] = o match {
    case Snapshot(data) ⇒ snapshotToBinary(data.asInstanceOf[AnyRef])
    case _              ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
  }

  /**
   * Deserializes a [[Snapshot]]. Delegates deserialization of snapshot `data` to a matching
   * `akka.serialization.Serializer`.
   */
  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
    Snapshot(snapshotFromBinary(bytes))

  private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
    def serialize() = {
      val extension = SerializationExtension(system)

      val snapshotSerializer = extension.findSerializerFor(snapshot)
      val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None

      val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest)
      val headerSerializer = extension.findSerializerFor(header)
      val headerBytes = headerSerializer.toBinary(header)

      val out = new ByteArrayOutputStream

      writeInt(out, headerBytes.length)

      out.write(headerBytes)
      out.write(snapshotSerializer.toBinary(snapshot))
      out.toByteArray
    }

    // serialize actor references with full address information (defaultAddress) 
    transportInformation match {
      case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() }
      case None     ⇒ serialize()
    }
  }

  private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
    val extension = SerializationExtension(system)

    val headerLength = readInt(new ByteArrayInputStream(bytes))
    val headerBytes = bytes.slice(4, headerLength + 4)
    val snapshotBytes = bytes.drop(headerLength + 4)

    val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]).get
    val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get)

    extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get
  }

  private def writeInt(outputStream: OutputStream, i: Int) =
    0 to 24 by 8 foreach { shift ⇒ outputStream.write(i >> shift) }

  private def readInt(inputStream: InputStream) =
    (0 to 24 by 8).foldLeft(0) { (id, shift) ⇒ (id | (inputStream.read() << shift)) }
}

Other Akka source code examples

Here is a short list of links related to this Akka SnapshotSerializer.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.