I recently started working with (ok, fooling around is a better wording) Reactor, a reactive library that gets used by the new reactive parts of Spring. I’m especially interested in how multithreading works with Reactor. So here is what I learned so far.

Some utilities

Break points are difficult to use with call backs and almost useless in multithreaded scenarios, so when playing around with Reactor I soon created some tiny tools to help me along.

The first tool is a tiny function that appends the name of the current Thread to the string representation of the Fluxs elements.

static Flux<String> addThread(Flux<?> flux) {
	return flux.map(e -> e + " " + Thread.currentThread());
}

It can help a lot when trying to understand what is going on with Threads

The second one requires that operations on a Flux get executed in a certain Thread. It uses JUnit and Hamcrest for this.

static <T> Flux<T> assertThread(Flux<T> flux, String name) {
	return flux.doOnNext(
			e -> assertThat(Thread.currentThread().getName(),
					startsWith(name))
	);
}

Also the tests have a String member threadName, containing the name of the Thread executing the test.

Reactor is single threaded by default.

We can see this with the following simple test.

@Test
public void reactorIsSingleThreadedByDefault() {

	Flux<Integer> flux = Flux.range(0, 1000);

	assertThread(flux, threadName)
			.blockLast(Duration.ofSeconds(1));
}

But there are many operations that use other Threads, especially those that delay elements.

@Test
public void delayingElementsIntroducesThreads() {
	Flux<Integer> flux = Flux.range(0, 1000)
			.delayElements(Duration.ofMillis(1));
	assertThread(flux, "timer")
			.blockLast(Duration.ofSeconds(3));
}

How to execute stuff on other threads

If you search for ways to do multithreading, you probably come across the two methods subscribeOn and publishOn. Both cause the following operation to execute on the specified thread (actually you provide a Scheduler, which in our case encapsulates a thread).

@Test
public void publishOn() {

	Flux<Integer> flux = Flux.range(0, 1000)
			.publishOn(Schedulers.newSingle("newThread"));

	assertThread(flux, "newThread")
			.blockLast(Duration.ofSeconds(1));
}

@Test
public void subscribeOn() {

	Flux<Integer> flux = Flux.range(0, 1000)
			.subscribeOn(Schedulers.newSingle("newThread"));

	assertThread(flux, "newThread")
			.blockLast(Duration.ofSeconds(1));
}

The obvious question is “What is the difference?”

For this to understand we look at what happens when we combine multiple calls to these methods.

@Test
public void publishOnTwice() {
	Flux<Integer> flux = Flux.range(0, 1000);
	Flux<Integer> fluxOnOne = assertThread(flux.publishOn(Schedulers.newSingle("one")), "one");
	Flux<Integer> fluxOnOneOnTwo = assertThread(fluxOnOne.publishOn(Schedulers.newSingle("two")), "two");
	fluxOnOneOnTwo.blockLast(Duration.ofSeconds(1));
}
@Test
public void subscribeOnTwice() {
	Flux<Integer> flux = Flux.range(0, 1000);
	Flux<Integer> fluxOnOne = assertThread(flux.subscribeOn(Schedulers.newSingle("one")), "one");
	Flux<Integer> fluxOnOneOnTwo = assertThread(fluxOnOne.subscribeOn(Schedulers.newSingle("two")), "one");
	fluxOnOneOnTwo.blockLast(Duration.ofSeconds(1));
}

publishOn causes the execution of everything after it to happen on the Scheduler passed in as an argument, until publishOn gets invoked again. Compare that to suscribeOn where basically the first call determines which Scheduler gets used and the calls afterwards get ignored.

Well not ignored, but they don’t have an effect that is easily visible. To understand this we need to think what actually happens inside a reactive pipeline. If we use “operators” on a Flux (A) we create a new Flux (B). When we subscribe to B it in turn subscribes to A. A in turn starts to publish events by calling B, which in turn calls our Subscriber. So we basically go through the stack of Fluxes twice, once on the “subscribe way”, and once (or actually for each event) on the “publish way”. All the asserting we are doing only happens on the publish way, where also all of the normal functionality in a real call.

If we have multiple subscribeOns in the pipeline they affect only the subscribe way but we normally don’t see that. Only the last subscribeOn affects the publish way. This is the only one we see.

Multiple publishOns on the other hand affect the publish way and we can observe every step of it, so we actually do see the effect of each of it.

The code for the test is available as a Gist.

Next up should be an article how publishOn and subscribeOn interact with flatMap. Hope it doesn’t take as long as this article.

Talks

Wan't to meet me in person to tell me how stupid I am? You can find me at the following events: