I recently watched the three Hackers at Cambridge Introduction to RxJava videos, and coded along with them. If you’re interested in some example RxJava code, here’s what I typed in.
The first thing you do is create a new Gradle/Java project with these commands:
mkdir MyProject cd MyProject gradle init --type java-application
With that project created you can begin creating some Java/RxJava code.
App.java
First up is the App.java class source code:
import io.reactivex.Observable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import io.reactivex.schedulers.Schedulers;
/**
* This code is based on this Hackers at Cambridge
* “Introduction to RxJava” videos, starting with:
* this one: https://www.youtube.com/watch?v=ZhqdnC43jMs
*/
public class App {
// the original method using `just`
static Observable<Integer> fakeUserInput1() {
Random r = new Random();
return Observable
.just(1,2,3,4,5)
.map(i -> r.nextInt(20));
}
// the 2nd approach using `intervalRange`
static Observable<Integer> fakeUserInput2() {
Random r = new Random();
return Observable
.intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
.map(i -> r.nextInt(20));
}
// "observables of observables"
// `concatMap` is like `flatMap`; i'm not sure what the differences are
static Observable<Integer> fakeUserInput3() {
Random r = new Random();
return Observable
// generate ten numbers, emitting them 500ms apart
.intervalRange(
0, 10, 500, 500, TimeUnit.MILLISECONDS
)
// create another observable, give it a random time delay
.concatMap(i ->
Observable.just(r.nextInt(20))
.delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
);
}
static Observable<Integer> fakeUserInput4() {
Random r = new Random();
return Observable
// generate ten numbers, emitting them 500ms apart
.intervalRange(
0, 10, 500, 500, TimeUnit.MILLISECONDS
)
// create another observable, give it a random time delay
.concatMap(i ->
Observable.just(r.nextInt(20))
.delay(r.nextInt(1000), TimeUnit.MILLISECONDS)
);
}
public static void main(String[] args) {
// Example 1
// ---------
// fakeUserInput1()
// .subscribe(line -> System.out.println(line));
// Example 2
// ---------
// need this when using `intervaleRange` because it runs
// on a separate thread
// fakeUserInput2()
// .blockingSubscribe(line -> System.out.println(line));
// Example 3
// ---------
// fakeUserInput3()
// .blockingSubscribe(line -> System.out.println(line));
// Example 4
// ---------
// initially tried to use concatMap here;
// “flatMap allows the interleaving of observables”
// (the next one can start before the current one finishes)
// fakeUserInput4()
// .flatMapMaybe(i -> RxFib.fibs().elementAt(i))
// .blockingSubscribe(line -> System.out.println(line));
// Example 5 - read input from the command line
// --------------------------------------------
System.out.println("MAIN THREAD: " + Thread.currentThread().getName());
RxReader.linesFromInput()
.map(s -> Integer.parseInt(s))
// this first line runs this code on another thread.
// there are other schedulers that can/could be run on.
.observeOn(Schedulers.trampoline())
.flatMapMaybe(i -> {
System.out.println("Trampoline THREAD: " + Thread.currentThread().getName());
return RxFib.fibs().elementAt(i);
})
// somehow blockingSubscribe is still called on the main thread
// despite observeOn running on a different thread
.blockingSubscribe(line -> System.out.println(line));
//his notes: “Make your threading predictable using
//.observeOn and Schedulers.trampoline.”
}
}
There were basically five demos in the three videos, so that’s why I have the five example sections in the main
method. Comment/uncomment those as desired to run the different examples.
RxFib.java
Next up, here’s my example of the RxJava Fibonacci sequence code (RxFib.java). My variable names don’t match the video 100%, but hopefully they’re close enough:
import io.reactivex.*;
public class RxFib {
static Observable<Integer> fibs() {
return Observable.create(subscriber -> {
// do some work ...
int prev = 0;
int curr = 1;
subscriber.onNext(0);
subscriber.onNext(1);
// while subscriber is asking for stuff,
// send it stuff
while (!subscriber.isDisposed()) {
int oldPrev = prev;
prev = curr;
curr += oldPrev;
// call onNext to put an item in the stream
subscriber.onNext(curr);
}
});
}
}
RxReader.java
Next, here’s the RxReader.java source code:
import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;
import java.io.*;
class RxReader {
static Observable<String> lines(BufferedReader reader) {
return Observable.<String>create(subscriber -> {
String line;
while ((line = reader.readLine()) != null) {
subscriber.onNext(line);
if (subscriber.isDisposed()) {
break;
}
}
//System.out.println("LINES THREAD: " + Thread.currentThread().getName());
subscriber.onComplete();
}).subscribeOn(Schedulers.io());
//that last line runs the code on a different thread.
//his notes: “Make your reader operate on a different thread
//using .subscribeOn and Schedulers.io.”
}
static Observable<String> linesFromInput() {
return lines(
new BufferedReader(new InputStreamReader(System.in))
);
}
}
Key points
A few key points about that RxJava code:
Observable
feels like a “wrapper” type; you have some data that you’re interested in working with, and you wrap it in anObservable
.- You create
Observable
values with various factory methods, includingjust
,intervalRange
, andcreate
.
build.gradle
Finally, here’s the build.gradle file, most of which was generated by the Gradle init
task:
plugins {
// support for Java
id 'java'
// apply the application plugin to add support for building an application
id 'application'
}
repositories {
// use jcenter for resolving your dependencies
jcenter()
}
run {
// ADDED
standardInput = System.in;
}
dependencies {
// ADDED
compile 'io.reactivex.rxjava2:rxjava:2.2.0'
testImplementation 'junit:junit:4.12'
}
// the main class for the application
mainClassName = 'cambridge.App'
The two things that were added to that file during the video are noted in comments in that file.
Summary
In summary, if you wanted to see some RxJava source code, I hope these examples from the Hackers at Cambridge Introduction to RxJava videos are helpful. Please see those videos for more details. Also, the reactivex.io has some excellent ReactiveX documentation.