alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (IODocTest.java)

This example Akka source code file (IODocTest.java) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, bytestring, commandfailed, connected, exception, inetsocketaddress, io, override, procedure, received, test, testing, testkit, untypedactor

The IODocTest.java Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package docs.io.japi;


import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;

//#imports
import java.net.InetSocketAddress;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.io.Tcp;
import akka.io.Tcp.Bound;
import akka.io.Tcp.CommandFailed;
import akka.io.Tcp.Connected;
import akka.io.Tcp.ConnectionClosed;
import akka.io.Tcp.Received;
import akka.io.TcpMessage;
import akka.japi.Procedure;
import akka.util.ByteString;
//#imports

import akka.testkit.JavaTestKit;
import akka.testkit.AkkaSpec;

public class IODocTest {

  static
  //#server
  public class Server extends UntypedActor {
    
    final ActorRef manager;
    
    public Server(ActorRef manager) {
      this.manager = manager;
    }

    @Override
    public void preStart() throws Exception {
      final ActorRef tcp = Tcp.get(getContext().system()).manager();
      tcp.tell(TcpMessage.bind(getSelf(),
          new InetSocketAddress("localhost", 0), 100), getSelf());
    }

    @Override
    public void onReceive(Object msg) throws Exception {
      if (msg instanceof Bound) {
        manager.tell(msg, getSelf());

      } else if (msg instanceof CommandFailed) {
        getContext().stop(getSelf());
      
      } else if (msg instanceof Connected) {
        final Connected conn = (Connected) msg;
        manager.tell(conn, getSelf());
        final ActorRef handler = getContext().actorOf(
            Props.create(SimplisticHandler.class));
        getSender().tell(TcpMessage.register(handler), getSelf());
      }
    }
    
  }
  //#server

  static
  //#simplistic-handler
  public class SimplisticHandler extends UntypedActor {
    @Override
    public void onReceive(Object msg) throws Exception {
      if (msg instanceof Received) {
        final ByteString data = ((Received) msg).data();
        System.out.println(data);
        getSender().tell(TcpMessage.write(data), getSelf());
      } else if (msg instanceof ConnectionClosed) {
        getContext().stop(getSelf());
      }
    }
  }
  //#simplistic-handler
  
  static
  //#client
  public class Client extends UntypedActor {
    
    final InetSocketAddress remote;
    final ActorRef listener;

    public Client(InetSocketAddress remote, ActorRef listener) {
      this.remote = remote;
      this.listener = listener;
      
      final ActorRef tcp = Tcp.get(getContext().system()).manager();
      tcp.tell(TcpMessage.connect(remote), getSelf());
    }

    @Override
    public void onReceive(Object msg) throws Exception {
      if (msg instanceof CommandFailed) {
        listener.tell("failed", getSelf());
        getContext().stop(getSelf());
        
      } else if (msg instanceof Connected) {
        listener.tell(msg, getSelf());
        getSender().tell(TcpMessage.register(getSelf()), getSelf());
        getContext().become(connected(getSender()));
      }
    }

    private Procedure<Object> connected(final ActorRef connection) {
      return new Procedure<Object>() {
        @Override
        public void apply(Object msg) throws Exception {
          
          if (msg instanceof ByteString) {
            connection.tell(TcpMessage.write((ByteString) msg), getSelf());
        
          } else if (msg instanceof CommandFailed) {
            // OS kernel socket buffer was full
          
          } else if (msg instanceof Received) {
            listener.tell(((Received) msg).data(), getSelf());
          
          } else if (msg.equals("close")) {
            connection.tell(TcpMessage.close(), getSelf());
          
          } else if (msg instanceof ConnectionClosed) {
            getContext().stop(getSelf());
          }
        }
      };
    }
    
  }
  //#client

  @ClassRule
  public static AkkaJUnitActorSystemResource actorSystemResource =
    new AkkaJUnitActorSystemResource("IODocTest", AkkaSpec.testConf());

  private final ActorSystem system = actorSystemResource.getSystem();

  @Test
  public void testConnection() {
    new JavaTestKit(system) {
      {
        @SuppressWarnings("unused")
        final ActorRef server = system.actorOf(Props.create(Server.class, getRef()), "server1");
        final InetSocketAddress listen = expectMsgClass(Bound.class).localAddress();
        final ActorRef client = system.actorOf(Props.create(Client.class, listen, getRef()), "client1");
        
        final Connected c1 = expectMsgClass(Connected.class);
        final Connected c2 = expectMsgClass(Connected.class);
        assert c1.localAddress().equals(c2.remoteAddress());
        assert c2.localAddress().equals(c1.remoteAddress());
        
        client.tell(ByteString.fromString("hello"), getRef());
        final ByteString reply = expectMsgClass(ByteString.class);
        assert reply.utf8String().equals("hello");
        
        watch(client);
        client.tell("close", getRef());
        expectTerminated(client);
      }
    };
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka IODocTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.