Some RxJava example source code (Hackers at Cambridge tutorials)

A Machine Learning Bestseller
Hands-On
Machine Learning
with Scikit-Learn

I recently watched the three Hackers at Cambridge Introduction to RxJava videos, and coded along with them. If you’re interested in some example RxJava code, here’s what I typed in.

The first thing you do is create a new Gradle/Java project with these commands:

mkdir MyProject
cd MyProject
gradle init --type java-application

With that project created you can begin creating some Java/RxJava code.

App.java

First up is the App.java class source code:

import io.reactivex.Observable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import io.reactivex.schedulers.Schedulers;

/**
 * This code is based on this Hackers at Cambridge 
 * “Introduction to RxJava” videos, starting with:
 * this one: https://www.youtube.com/watch?v=ZhqdnC43jMs
 */
public class App {

    // the original method using `just`
    static Observable<Integer> fakeUserInput1() {
        Random r = new Random();
        return Observable
            .just(1,2,3,4,5)
            .map(i -> r.nextInt(20));
    }

    // the 2nd approach using `intervalRange`
    static Observable<Integer> fakeUserInput2() {
        Random r = new Random();
        return Observable
            .intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
            .map(i -> r.nextInt(20));
    }

    // "observables of observables"
    // `concatMap` is like `flatMap`; i'm not sure what the differences are
    static Observable<Integer> fakeUserInput3() {
        Random r = new Random();
        return Observable
            // generate ten numbers, emitting them 500ms apart
            .intervalRange(
                0, 10, 500, 500, TimeUnit.MILLISECONDS
            )
            // create another observable, give it a random time delay
            .concatMap(i ->
                Observable.just(r.nextInt(20))
                .delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
            );
    }

    static Observable<Integer> fakeUserInput4() {
        Random r = new Random();
        return Observable
            // generate ten numbers, emitting them 500ms apart
            .intervalRange(
                0, 10, 500, 500, TimeUnit.MILLISECONDS
            )
            // create another observable, give it a random time delay
            .concatMap(i ->
                Observable.just(r.nextInt(20))
                .delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
            );
    }

    public static void main(String[] args) {

        // Example 1
        // ---------
        // fakeUserInput1()
        //     .subscribe(line -> System.out.println(line));

        // Example 2
        // ---------
        // need this when using `intervaleRange` because it runs
        // on a separate thread
        // fakeUserInput2()
        //     .blockingSubscribe(line -> System.out.println(line));

        // Example 3
        // ---------
        // fakeUserInput3()
        //     .blockingSubscribe(line -> System.out.println(line));

        // Example 4
        // ---------
        // initially tried to use concatMap here;
        // “flatMap allows the interleaving of observables”
        // (the next one can start before the current one finishes)
        // fakeUserInput4()
        //     .flatMapMaybe(i -> RxFib.fibs().elementAt(i))
        //     .blockingSubscribe(line -> System.out.println(line));

        // Example 5 - read input from the command line
        // --------------------------------------------
        System.out.println("MAIN THREAD: " + Thread.currentThread().getName());
        RxReader.linesFromInput()
            .map(s -> Integer.parseInt(s))
            // this first line runs this code on another thread.
            // there are other schedulers that can/could be run on.
            .observeOn(Schedulers.trampoline())
            .flatMapMaybe(i -> {
                System.out.println("Trampoline THREAD: " + Thread.currentThread().getName());
                return RxFib.fibs().elementAt(i);
            })
            // somehow blockingSubscribe is still called on the main thread
            // despite observeOn running on a different thread
            .blockingSubscribe(line -> System.out.println(line));
            //his notes: “Make your threading predictable using
            //.observeOn and Schedulers.trampoline.”

    }

}

There were basically five demos in the three videos, so that’s why I have the five example sections in the main method. Comment/uncomment those as desired to run the different examples.

RxFib.java

Next up, here’s my example of the RxJava Fibonacci sequence code (RxFib.java). My variable names don’t match the video 100%, but hopefully they’re close enough:

import io.reactivex.*;

public class RxFib {

    static Observable<Integer> fibs() {
        return Observable.create(subscriber -> {
            // do some work ...
            int prev = 0;
            int curr = 1;

            subscriber.onNext(0);
            subscriber.onNext(1);

            // while subscriber is asking for stuff,
            // send it stuff
            while (!subscriber.isDisposed()) {
                int oldPrev = prev;
                prev = curr;
                curr += oldPrev;

                // call onNext to put an item in the stream
                subscriber.onNext(curr);
            }
        });
    }

}

RxReader.java

Next, here’s the RxReader.java source code:

import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;
import java.io.*;

class RxReader {

    static Observable<String> lines(BufferedReader reader) {
        return Observable.<String>create(subscriber -> {
            String line;
            while ((line = reader.readLine()) != null) {
                subscriber.onNext(line);
                if (subscriber.isDisposed()) {
                    break;
                }
            }
            //System.out.println("LINES THREAD: " + Thread.currentThread().getName());
            subscriber.onComplete();
        }).subscribeOn(Schedulers.io());
        //that last line runs the code on a different thread.
        //his notes: “Make your reader operate on a different thread
        //using .subscribeOn and Schedulers.io.”
    }

    static Observable<String> linesFromInput() {
        return lines(
            new BufferedReader(new InputStreamReader(System.in))
        );
    }
}

Key points

A few key points about that RxJava code:

  • Observable feels like a “wrapper” type; you have some data that you’re interested in working with, and you wrap it in an Observable.
  • You create Observable values with various factory methods, including just, intervalRange, and create.

build.gradle

Finally, here’s the build.gradle file, most of which was generated by the Gradle init task:

plugins {
    // support for Java
    id 'java'

    // apply the application plugin to add support for building an application
    id 'application'
}

repositories {
    // use jcenter for resolving your dependencies
    jcenter()
}

run {
    // ADDED
    standardInput = System.in;
}

dependencies {
    // ADDED
    compile 'io.reactivex.rxjava2:rxjava:2.2.0'
    testImplementation 'junit:junit:4.12'
}

// the main class for the application
mainClassName = 'cambridge.App'

The two things that were added to that file during the video are noted in comments in that file.

A Programming Classic!
The
Pragmatic
Programmer

Summary

In summary, if you wanted to see some RxJava source code, I hope these examples from the Hackers at Cambridge Introduction to RxJava videos are helpful. Please see those videos for more details. Also, the reactivex.io has some excellent ReactiveX documentation.