alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ZeromqDocTest.java)

This example Akka source code file (ZeromqDocTest.java) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, bytestring, connect, heap, listener, load, override, serialization, serialize, test, testing, untypedactor, zeromq, zmqmessage

The ZeromqDocTest.java Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.zeromq;

//#import-pub-socket
import akka.zeromq.Bind;
import akka.zeromq.ZeroMQExtension;
//#import-pub-socket
//#import-sub-socket
import akka.zeromq.Connect;
import akka.zeromq.Listener;
import akka.zeromq.Subscribe;
//#import-sub-socket
//#import-unsub-topic-socket
import akka.zeromq.Unsubscribe;
//#import-unsub-topic-socket
//#import-pub-topic
import akka.util.ByteString;
import akka.zeromq.ZMQMessage;
//#import-pub-topic

import akka.zeromq.HighWatermark;
import akka.zeromq.SocketOption;
import akka.zeromq.ZeroMQVersion;

//#import-health
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.junit.*;
import scala.concurrent.duration.Duration;
import akka.serialization.SerializationExtension;
import akka.serialization.Serialization;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
//#import-health

import com.typesafe.config.ConfigFactory;

import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.util.Date;
import java.text.SimpleDateFormat;

import akka.actor.ActorSystem;
import akka.testkit.AkkaSpec;
import akka.testkit.AkkaJUnitActorSystemResource;

public class ZeromqDocTest {

  @ClassRule
  public static AkkaJUnitActorSystemResource actorSystemResource =
    new AkkaJUnitActorSystemResource("ZeromqDocTest",
      ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf()));

  private final ActorSystem system = actorSystemResource.getSystem();

  @SuppressWarnings("unused")
  @Test
  public void demonstrateCreateSocket() {
    Assume.assumeTrue(checkZeroMQInstallation());

    //#pub-socket
    ActorRef pubSocket = ZeroMQExtension.get(system).newPubSocket(
      new Bind("tcp://127.0.0.1:1233"));
    //#pub-socket

    //#sub-socket
    ActorRef listener = system.actorOf(Props.create(ListenerActor.class));
    ActorRef subSocket = ZeroMQExtension.get(system).newSubSocket(
      new Connect("tcp://127.0.0.1:1233"),
      new Listener(listener), Subscribe.all());
    //#sub-socket

    //#sub-topic-socket
    ActorRef subTopicSocket = ZeroMQExtension.get(system).newSubSocket(
      new Connect("tcp://127.0.0.1:1233"),
      new Listener(listener), new Subscribe("foo.bar"));
    //#sub-topic-socket

    //#unsub-topic-socket
    subTopicSocket.tell(new Unsubscribe("foo.bar"), ActorRef.noSender());
    //#unsub-topic-socket

    byte[] payload = new byte[0];
    //#pub-topic
    pubSocket.tell(ZMQMessage.withFrames(ByteString.fromString("foo.bar"), 
      ByteString.fromArray(payload)), ActorRef.noSender());
    //#pub-topic

    system.stop(subSocket);
    system.stop(subTopicSocket);

    //#high-watermark
    ActorRef highWatermarkSocket = ZeroMQExtension.get(system).newRouterSocket(
        new SocketOption[] { new Listener(listener),
          new Bind("tcp://127.0.0.1:1233"), new HighWatermark(50000) });
    //#high-watermark
  }

  @Test
  public void demonstratePubSub() throws Exception {
    Assume.assumeTrue(checkZeroMQInstallation());

    //#health2

    system.actorOf(Props.create(HealthProbe.class), "health");
    //#health2

    //#logger2

    system.actorOf(Props.create(Logger.class), "logger");
    //#logger2

    //#alerter2

    system.actorOf(Props.create(HeapAlerter.class), "alerter");
    //#alerter2

    // Let it run for a while to see some output.
    // Don't do like this in real tests, this is only doc demonstration.
    Thread.sleep(3000L);
  }

  private boolean checkZeroMQInstallation() {
    try {
      ZeroMQVersion v = ZeroMQExtension.get(system).version();
      return (v.major() >= 3 || (v.major() >= 2 && v.minor() >= 1));
    } catch (LinkageError e) {
      return false;
    }
  }

  static
  //#listener-actor
  public class ListenerActor extends UntypedActor {
    public void onReceive(Object message) throws Exception {
      //...
    }
  }
  //#listener-actor

  static
  //#health
  public final Object TICK = "TICK";

  //#health
  static
  //#health
  public class Heap implements Serializable {
    private static final long serialVersionUID = 1L;
    public final long timestamp;
    public final long used;
    public final long max;

    public Heap(long timestamp, long used, long max) {
      this.timestamp = timestamp;
      this.used = used;
      this.max = max;
    }
  }

  //#health
  static
  //#health
  public class Load implements Serializable {
    private static final long serialVersionUID = 1L;
    public final long timestamp;
    public final double loadAverage;

    public Load(long timestamp, double loadAverage) {
      this.timestamp = timestamp;
      this.loadAverage = loadAverage;
    }
  }

  //#health
  static
  //#health
  public class HealthProbe extends UntypedActor {

    ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(
      new Bind("tcp://127.0.0.1:1237"));
    MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
    OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
    Serialization ser = SerializationExtension.get(getContext().system());

    @Override
    public void preStart() {
      getContext().system().scheduler()
          .schedule(Duration.create(1, "second"), Duration.create(1, "second"),
            getSelf(), TICK, getContext().dispatcher(), ActorRef.noSender());
    }

    @Override
    public void postRestart(Throwable reason) {
      // don't call preStart, only schedule once
    }

    @Override
    public void onReceive(Object message) {
      if (message.equals(TICK)) {
        MemoryUsage currentHeap = memory.getHeapMemoryUsage();
        long timestamp = System.currentTimeMillis();

        // use akka SerializationExtension to convert to bytes
        ByteString heapTopic = ByteString.fromString("health.heap", "UTF-8");
        ByteString heapPayload = ByteString.fromArray(
                ser.serialize(
                    new Heap(timestamp,
                          currentHeap.getUsed(),
                          currentHeap.getMax())
                ).get());
        // the first frame is the topic, second is the message
        pubSocket.tell(ZMQMessage.withFrames(heapTopic, heapPayload), getSelf());

        // use akka SerializationExtension to convert to bytes
        ByteString loadTopic = ByteString.fromString("health.load", "UTF-8");
        ByteString loadPayload = ByteString.fromArray(
                ser.serialize(new Load(timestamp, os.getSystemLoadAverage())).get()
        );
        // the first frame is the topic, second is the message
        pubSocket.tell(ZMQMessage.withFrames(loadTopic, loadPayload), getSelf());
      } else {
        unhandled(message);
      }
    }

  }
  //#health

  static
  //#logger
  public class Logger extends UntypedActor {

    ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(
      new Connect("tcp://127.0.0.1:1237"),
        new Listener(getSelf()), new Subscribe("health"));
    Serialization ser = SerializationExtension.get(getContext().system());
    SimpleDateFormat timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void onReceive(Object message) {
      if (message instanceof ZMQMessage) {
        ZMQMessage m = (ZMQMessage) message;
        String topic = m.frame(0).utf8String();
        // the first frame is the topic, second is the message
        if ("health.heap".equals(topic)) {
          Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get();
          log.info("Used heap {} bytes, at {}", heap.used,
            timestampFormat.format(new Date(heap.timestamp)));
        } else if ("health.load".equals(topic)) {
          Load load = ser.deserialize(m.frame(1).toArray(), Load.class).get();
          log.info("Load average {}, at {}", load.loadAverage,
            timestampFormat.format(new Date(load.timestamp)));
        }
      } else {
        unhandled(message);
      }
    }

  }

  //#logger

  static
  //#alerter
  public class HeapAlerter extends UntypedActor {

    ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(
      new Connect("tcp://127.0.0.1:1237"),
      new Listener(getSelf()), new Subscribe("health.heap"));
    Serialization ser = SerializationExtension.get(getContext().system());
    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    int count = 0;

    @Override
    public void onReceive(Object message) {
      if (message instanceof ZMQMessage) {
        ZMQMessage m = (ZMQMessage) message;
        String topic = m.frame(0).utf8String();
        // the first frame is the topic, second is the message
        if ("health.heap".equals(topic)) {
          Heap heap = ser.<Heap>deserialize(m.frame(1).toArray(), Heap.class).get();
          if (((double) heap.used / heap.max) > 0.9) {
            count += 1;
          } else {
            count = 0;
          }
          if (count > 10) {
            log.warning("Need more memory, using {} %",
              (100.0 * heap.used / heap.max));
          }
        }
      } else {
        unhandled(message);
      }
    }

  }
  //#alerter

}

Other Akka source code examples

Here is a short list of links related to this Akka ZeromqDocTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.