| A Machine Learning Bestseller | |
|
Hands-On Machine Learning with Scikit-Learn |
I recently watched the three Hackers at Cambridge Introduction to RxJava videos, and coded along with them. If you’re interested in some example RxJava code, here’s what I typed in.
The first thing you do is create a new Gradle/Java project with these commands:
mkdir MyProject cd MyProject gradle init --type java-application
With that project created you can begin creating some Java/RxJava code.
App.java
First up is the App.java class source code:
import io.reactivex.Observable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import io.reactivex.schedulers.Schedulers;
/**
* This code is based on this Hackers at Cambridge
* “Introduction to RxJava” videos, starting with:
* this one: https://www.youtube.com/watch?v=ZhqdnC43jMs
*/
public class App {
// the original method using `just`
static Observable<Integer> fakeUserInput1() {
Random r = new Random();
return Observable
.just(1,2,3,4,5)
.map(i -> r.nextInt(20));
}
// the 2nd approach using `intervalRange`
static Observable<Integer> fakeUserInput2() {
Random r = new Random();
return Observable
.intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
.map(i -> r.nextInt(20));
}
// "observables of observables"
// `concatMap` is like `flatMap`; i'm not sure what the differences are
static Observable<Integer> fakeUserInput3() {
Random r = new Random();
return Observable
// generate ten numbers, emitting them 500ms apart
.intervalRange(
0, 10, 500, 500, TimeUnit.MILLISECONDS
)
// create another observable, give it a random time delay
.concatMap(i ->
Observable.just(r.nextInt(20))
.delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
);
}
static Observable<Integer> fakeUserInput4() {
Random r = new Random();
return Observable
// generate ten numbers, emitting them 500ms apart
.intervalRange(
0, 10, 500, 500, TimeUnit.MILLISECONDS
)
// create another observable, give it a random time delay
.concatMap(i ->
Observable.just(r.nextInt(20))
.delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
);
}
public static void main(String[] args) {
// Example 1
// ---------
// fakeUserInput1()
// .subscribe(line -> System.out.println(line));
// Example 2
// ---------
// need this when using `intervaleRange` because it runs
// on a separate thread
// fakeUserInput2()
// .blockingSubscribe(line -> System.out.println(line));
// Example 3
// ---------
// fakeUserInput3()
// .blockingSubscribe(line -> System.out.println(line));
// Example 4
// ---------
// initially tried to use concatMap here;
// “flatMap allows the interleaving of observables”
// (the next one can start before the current one finishes)
// fakeUserInput4()
// .flatMapMaybe(i -> RxFib.fibs().elementAt(i))
// .blockingSubscribe(line -> System.out.println(line));
// Example 5 - read input from the command line
// --------------------------------------------
System.out.println("MAIN THREAD: " + Thread.currentThread().getName());
RxReader.linesFromInput()
.map(s -> Integer.parseInt(s))
// this first line runs this code on another thread.
// there are other schedulers that can/could be run on.
.observeOn(Schedulers.trampoline())
.flatMapMaybe(i -> {
System.out.println("Trampoline THREAD: " + Thread.currentThread().getName());
return RxFib.fibs().elementAt(i);
})
// somehow blockingSubscribe is still called on the main thread
// despite observeOn running on a different thread
.blockingSubscribe(line -> System.out.println(line));
//his notes: “Make your threading predictable using
//.observeOn and Schedulers.trampoline.”
}
}
There were basically five demos in the three videos, so that’s why I have the five example sections in the main method. Comment/uncomment those as desired to run the different examples.
RxFib.java
Next up, here’s my example of the RxJava Fibonacci sequence code (RxFib.java). My variable names don’t match the video 100%, but hopefully they’re close enough:
import io.reactivex.*;
public class RxFib {
static Observable<Integer> fibs() {
return Observable.create(subscriber -> {
// do some work ...
int prev = 0;
int curr = 1;
subscriber.onNext(0);
subscriber.onNext(1);
// while subscriber is asking for stuff,
// send it stuff
while (!subscriber.isDisposed()) {
int oldPrev = prev;
prev = curr;
curr += oldPrev;
// call onNext to put an item in the stream
subscriber.onNext(curr);
}
});
}
}
RxReader.java
Next, here’s the RxReader.java source code:
import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;
import java.io.*;
class RxReader {
static Observable<String> lines(BufferedReader reader) {
return Observable.<String>create(subscriber -> {
String line;
while ((line = reader.readLine()) != null) {
subscriber.onNext(line);
if (subscriber.isDisposed()) {
break;
}
}
//System.out.println("LINES THREAD: " + Thread.currentThread().getName());
subscriber.onComplete();
}).subscribeOn(Schedulers.io());
//that last line runs the code on a different thread.
//his notes: “Make your reader operate on a different thread
//using .subscribeOn and Schedulers.io.”
}
static Observable<String> linesFromInput() {
return lines(
new BufferedReader(new InputStreamReader(System.in))
);
}
}
Key points
A few key points about that RxJava code:
Observablefeels like a “wrapper” type; you have some data that you’re interested in working with, and you wrap it in anObservable.- You create
Observablevalues with various factory methods, includingjust,intervalRange, andcreate.
build.gradle
Finally, here’s the build.gradle file, most of which was generated by the Gradle init task:
plugins {
// support for Java
id 'java'
// apply the application plugin to add support for building an application
id 'application'
}
repositories {
// use jcenter for resolving your dependencies
jcenter()
}
run {
// ADDED
standardInput = System.in;
}
dependencies {
// ADDED
compile 'io.reactivex.rxjava2:rxjava:2.2.0'
testImplementation 'junit:junit:4.12'
}
// the main class for the application
mainClassName = 'cambridge.App'
The two things that were added to that file during the video are noted in comments in that file.
| A Programming Classic! | |
|
The Pragmatic Programmer |
Summary
In summary, if you wanted to see some RxJava source code, I hope these examples from the Hackers at Cambridge Introduction to RxJava videos are helpful. Please see those videos for more details. Also, the reactivex.io has some excellent ReactiveX documentation.

