Some RxJava example source code (Hackers at Cambridge tutorials)

I recently watched the three Hackers at Cambridge Introduction to RxJava videos, and coded along with them. If you’re interested in some example RxJava code, here’s what I typed in.

The first thing you do is create a new Gradle/Java project with these commands:

mkdir MyProject
cd MyProject
gradle init --type java-application

With that project created you can begin creating some Java/RxJava code.

App.java

First up is the App.java class source code:

import io.reactivex.Observable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import io.reactivex.schedulers.Schedulers;

/**
 * This code is based on this Hackers at Cambridge 
 * “Introduction to RxJava” videos, starting with:
 * this one: https://www.youtube.com/watch?v=ZhqdnC43jMs
 */
public class App {

    // the original method using `just`
    static Observable<Integer> fakeUserInput1() {
        Random r = new Random();
        return Observable
            .just(1,2,3,4,5)
            .map(i -> r.nextInt(20));
    }

    // the 2nd approach using `intervalRange`
    static Observable<Integer> fakeUserInput2() {
        Random r = new Random();
        return Observable
            .intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
            .map(i -> r.nextInt(20));
    }

    // "observables of observables"
    // `concatMap` is like `flatMap`; i'm not sure what the differences are
    static Observable<Integer> fakeUserInput3() {
        Random r = new Random();
        return Observable
            // generate ten numbers, emitting them 500ms apart
            .intervalRange(
                0, 10, 500, 500, TimeUnit.MILLISECONDS
            )
            // create another observable, give it a random time delay
            .concatMap(i ->
                Observable.just(r.nextInt(20))
                .delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
            );
    }

    static Observable<Integer> fakeUserInput4() {
        Random r = new Random();
        return Observable
            // generate ten numbers, emitting them 500ms apart
            .intervalRange(
                0, 10, 500, 500, TimeUnit.MILLISECONDS
            )
            // create another observable, give it a random time delay
            .concatMap(i ->
                Observable.just(r.nextInt(20))
                .delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
            );
    }

    public static void main(String[] args) {

        // Example 1
        // ---------
        // fakeUserInput1()
        //     .subscribe(line -> System.out.println(line));

        // Example 2
        // ---------
        // need this when using `intervaleRange` because it runs
        // on a separate thread
        // fakeUserInput2()
        //     .blockingSubscribe(line -> System.out.println(line));

        // Example 3
        // ---------
        // fakeUserInput3()
        //     .blockingSubscribe(line -> System.out.println(line));

        // Example 4
        // ---------
        // initially tried to use concatMap here;
        // “flatMap allows the interleaving of observables”
        // (the next one can start before the current one finishes)
        // fakeUserInput4()
        //     .flatMapMaybe(i -> RxFib.fibs().elementAt(i))
        //     .blockingSubscribe(line -> System.out.println(line));

        // Example 5 - read input from the command line
        // --------------------------------------------
        System.out.println("MAIN THREAD: " + Thread.currentThread().getName());
        RxReader.linesFromInput()
            .map(s -> Integer.parseInt(s))
            // this first line runs this code on another thread.
            // there are other schedulers that can/could be run on.
            .observeOn(Schedulers.trampoline())
            .flatMapMaybe(i -> {
                System.out.println("Trampoline THREAD: " + Thread.currentThread().getName());
                return RxFib.fibs().elementAt(i);
            })
            // somehow blockingSubscribe is still called on the main thread
            // despite observeOn running on a different thread
            .blockingSubscribe(line -> System.out.println(line));
            //his notes: “Make your threading predictable using
            //.observeOn and Schedulers.trampoline.”

    }

}

There were basically five demos in the three videos, so that’s why I have the five example sections in the main method. Comment/uncomment those as desired to run the different examples.

RxFib.java

Next up, here’s my example of the RxJava Fibonacci sequence code (RxFib.java). My variable names don’t match the video 100%, but hopefully they’re close enough:

import io.reactivex.*;

public class RxFib {

    static Observable<Integer> fibs() {
        return Observable.create(subscriber -> {
            // do some work ...
            int prev = 0;
            int curr = 1;

            subscriber.onNext(0);
            subscriber.onNext(1);

            // while subscriber is asking for stuff,
            // send it stuff
            while (!subscriber.isDisposed()) {
                int oldPrev = prev;
                prev = curr;
                curr += oldPrev;

                // call onNext to put an item in the stream
                subscriber.onNext(curr);
            }
        });
    }

}

RxReader.java

Next, here’s the RxReader.java source code:

import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;
import java.io.*;

class RxReader {

    static Observable<String> lines(BufferedReader reader) {
        return Observable.<String>create(subscriber -> {
            String line;
            while ((line = reader.readLine()) != null) {
                subscriber.onNext(line);
                if (subscriber.isDisposed()) {
                    break;
                }
            }
            //System.out.println("LINES THREAD: " + Thread.currentThread().getName());
            subscriber.onComplete();
        }).subscribeOn(Schedulers.io());
        //that last line runs the code on a different thread.
        //his notes: “Make your reader operate on a different thread
        //using .subscribeOn and Schedulers.io.”
    }

    static Observable<String> linesFromInput() {
        return lines(
            new BufferedReader(new InputStreamReader(System.in))
        );
    }
}

Key points

A few key points about that RxJava code:

  • Observable feels like a “wrapper” type; you have some data that you’re interested in working with, and you wrap it in an Observable.
  • You create Observable values with various factory methods, including just, intervalRange, and create.

build.gradle

Finally, here’s the build.gradle file, most of which was generated by the Gradle init task:

plugins {
    // support for Java
    id 'java'

    // apply the application plugin to add support for building an application
    id 'application'
}

repositories {
    // use jcenter for resolving your dependencies
    jcenter()
}

run {
    // ADDED
    standardInput = System.in;
}

dependencies {
    // ADDED
    compile 'io.reactivex.rxjava2:rxjava:2.2.0'
    testImplementation 'junit:junit:4.12'
}

// the main class for the application
mainClassName = 'cambridge.App'

The two things that were added to that file during the video are noted in comments in that file.

Summary

In summary, if you wanted to see some RxJava source code, I hope these examples from the Hackers at Cambridge Introduction to RxJava videos are helpful. Please see those videos for more details. Also, the reactivex.io has some excellent ReactiveX documentation.