RxJava – gestire i thread

In questo ultimo post dedicato ai concetti fondamentali di RxJava vedremo come gestire i thread. Faremo prima degli esempi minimali in modo da comprendere i meccanismi sottostanti e poi andremo ad utilizzare le conoscenze acquisite nell’esempio della volta precedente.

Oramai siamo affezionati alla nostra lista di città che esponiamo come Observable, quindi la useremo anche negli esempi di base:
[code lang=”java”]
Observable<String> allCities = Observable.just("Zurich", "London", "Paris", "Berlin", "Roma", "Madrid", "Wien");
[/code]
in più introduciamo un semplicissimo log sulla console che ci servirà per sapere qual è il thread corrente che sta usando l’Observable:
[code lang=”java”]
public static void logStringWithThread(String string) {
System.out.printf("%-26.26s: %-10.10s%n", Thread.currentThread().getName(), string);
}
public static Action1<? super String> debugString = string -> logStringWithThread(string);
[/code]
Ci sono gli ingredienti necessari per partire per cui.. azione! 🙂

Chi ha detto asincrono?

Nel primo post su RxJava abbiamo parlato dell’operatore doOnNext, utile per eseguire un’azione sull’Observable ogni volta che emette un elemento. Noi lo sfrutteremo per stampare la stringa corrente e conoscere il thread in esecuzione attraverso la funzione debugString, in più eseguiremo due banali trasformazioni sulla lista di città:
[code lang=”java”]
allCities
.doOnNext(debugString)
.map(s -> s + "-")
.doOnNext(debugString)
.map(s -> "-" + s)
.subscribe(debugString);
[/code]
Il risultato è riportato qui sotto.
[code]
main : Zurich
main : Zurich-
main : -Zurich-
main : London
main : London-
main : -London-
main : Paris
main : Paris-
….
[/code]
I log ci indicano che, a partire dalla generazione dell’elemento fino ad arrivare all’Observer, siamo sempre nel main thread. E’ vero che RxJava si esprime al suo meglio quando lavoriamo in modalità asincrona, ma nulla indica che questo sia il comportamento di default. Abbiamo visto proprio in questo esempio che l’Observable inizia ad emettere elementi usando il thread che lo genera, in questo caso il principale.

Non dobbiamo però neanche fare l’assunzione opposta, cioè che un’esecuzione sincrona resti sempre tale. Guardiamo questo esempio in cui chiediamo all’Observable di ritardare di mezzo secondo l’emissione di un elemento tramite l’operatore delay:
[code lang=”java”]
allCities
.doOnNext(debugString)
.delay(500, TimeUnit.MILLISECONDS)
.map(s -> s + "-")
.doOnNext(debugString)
.map(s -> "-" + s)
.subscribe(debugString);
[/code]
il risultato dell’esecuzione è il seguente.
[code]
main : Zurich
main : London
main : Paris
main : Berlin
main : Roma
main : Madrid
main : Wien
RxComputationThreadPool-1 : Zurich-
RxComputationThreadPool-1 : -Zurich-
RxComputationThreadPool-1 : London-
RxComputationThreadPool-1 : -London-

[/code]
Come si vede, l’operatore delay fa cambiare thread usandone uno preso da un thread pool definito da RxJava. Non solo l’Observable diventa asincrono ma anche il suo Observer. Dato che questo codice viene eseguito in una semplice applicazione da riga di comando, se non mettiamo uno sleep come ultima riga di codice, rischiamo di non vedere il risultato perché l’applicazione termina prima.

Quindi, come complemento dell’indicazione di prima, possiamo dire questo: è sempre bene gestire l’Observable in modalità asincrona, anche se non lo stiamo forzando esplicitamente.

Cambiamo thread!

Il primo metodo che possiamo usare per cambiare il thread dell’Observable è il metodo subscribeOn: ci consente di passare in input uno Scheduler, cioè, secondo la documentazione, un oggetto capace di eseguire un’unità di lavoro. Non vogliamo entrare nel dettaglio dello Scheduler, ci basta sapere che usandolo andiamo a “pescare” un thread da uno specifico pool e su questo eseguiamo l’Observable. RxJava mette a disposizione alcuni Scheduler predefiniti. Per il nostro primo esempio useremo Schedulers.computation che è pensato per svolgere lavori di calcolo. Se vogliamo gestire attività input/output useremo lo Scheduler Schedulers.io.

Cambiamo, quindi, il codice in questo modo:
[code lang=”java”]
allCities
.subscribeOn(Schedulers.computation())
.doOnNext(debugString)
.map(s -> s+ "-")
.doOnNext(debugString)
.map(s -> "-" + s)
.subscribe(debugString);
[/code]
e osserviamo il seguente risultato:
[code]
RxComputationThreadPool-1 : Zurich
RxComputationThreadPool-1 : Zurich-
RxComputationThreadPool-1 : -Zurich-
RxComputationThreadPool-1 : London
RxComputationThreadPool-1 : London-
RxComputationThreadPool-1 : -London-
RxComputationThreadPool-1 : Paris

[/code]

L’Observable viene gestito in maniera asincrona dall’inizio alla fine utilizzando un unico thread preso dal thread pool di Schedulers.computation. Due cose importanti da tenere presenti su questo metodo. La prima è che se lo si usa più volte cambiando Scheduler, le chiamate successive non hanno effetto. Quindi se, dopo il primo map, scrivessimo subscribeOn(Schedulers.io()) questa istruzione verrebbe ignorata. Seconda cosa: subscribeOn può essere invocato in qualsiasi punto della catena, il suo effetto non cambia.

Complementare a subscribeOn è il metodo observeOn che modifica il thread su cui l’Observable sta lavorando nel punto in cui viene inserito e ci permette di cambiare Scheduler, anche più volte. Vediamolo in azione:
[code lang=”java”]
allCities
.doOnNext(debugString)
.observeOn(Schedulers.computation())
.map(s -> "-" + s)
.doOnNext(debugString)
.map(s -> s + "-")
.subscribe(debugString);
[/code]

Dopo aver emesso l’elemento e averlo stampato la prima volta in console, cambiamo thread spostandolo sullo Schedulers.computation e continuiamo il flusso dati ottenendo questo output:
[code]
main : Zurich
main : London
main : Paris
main : Berlin
main : Roma
RxComputationThreadPool-3 : -Zurich
main : Madrid
RxComputationThreadPool-3 : -Zurich-
main : Wien
RxComputationThreadPool-3 : -London

[/code]

Prima ancora che il thread principale termini, l’esecuzione asincrona ha inizio e quindi i due thread stanno lavorando in concorrenza.

Possiamo usare subscribeOn e observeOn assieme? Certamente, facciamo un esempio in cui usiamo tre thread diversi usando observeOn due volte.
[code lang=”java”]
allCities
.subscribeOn(Schedulers.computation())
.doOnNext(debugString)
.map(s -> "-" + s)
.observeOn(Schedulers.io())
.doOnNext(debugString)
.map(s -> s + "-")
.observeOn(Schedulers.computation())
.subscribe(debugString);
[/code]
[code]
RxComputationThreadPool-4 : Zurich
RxComputationThreadPool-4 : London
RxComputationThreadPool-4 : Paris
RxComputationThreadPool-4 : Berlin
RxComputationThreadPool-4 : Roma
RxCachedThreadScheduler-1 : -Zurich
RxComputationThreadPool-4 : Madrid
RxComputationThreadPool-3 : -Zurich-
RxCachedThreadScheduler-1 : -London
RxComputationThreadPool-4 : Wien
RxComputationThreadPool-3 : -London-

[/code]

Senza dare alcuna parametrizzazione aggiuntiva, l’effetto che otteniamo è quello di avere tre thread che lavorano in pipeline: ogni città passa in ordine attraverso i thread gestiti dagli Scheduler. Notiamo che da Scheduler.computation vengono estratti due thread.

Combinare più chiamate REST

E’ il momento di riprendere l’esempio del post precedente, l’interrogazione del servizio per il meteo. Oltre a conoscere le condizioni attuali, è possibile ottenere ad un altro indirizzo la previsione per le giornate successive usando un url in questa forma http://api.openweathermap.org/data/2.5/forecast/daily?q=Roma&mode=json&units=metric&cnt=2.

Non riportiamo qui i dettagli del codice che si possono consultare su GitHub, ma abbiamo introdotto il metodo
public static Observable<String> getTomorrowWeather(String cityName) il cui compito è appunto quello di invocare il servizio e restituire la previsione per domani.

Abbiamo ora due servizi disponibili, ciascuno incapsulato in un Observable. Usando il metodo zip introdotto nel primo post, sappiamo come concatenare le risposte che arrivano dal server. Adoperando, invece, la stessa tecnica vista nel secondo post, siamo in grado di aggiungere il nome della città e trasformare il tutto in un Observable di stringhe:
[code lang=”java”]
Observable<String> restCalls = allCities.flatMap(
cityName -> Observable.zip(
getCurrentWeather(cityName),
getTomorrowWeather(cityName),
(today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow)
.map(weather -> cityName + "->" + weather)
);
restCalls.subscribe(System.out::println);
[/code]
ottenendo qualcosa di simile a questo:
[code]
Zurich->today: overcast clouds, tomorrow: light snow
London->today: Sky is Clear, tomorrow: sky is clear
Paris->today: mist, tomorrow: light rain
Berlin->today: light intensity drizzle, tomorrow: light rain
Roma->today: proximity thunderstorm, tomorrow: heavy intensity rain
Madrid->today: few clouds, tomorrow: sky is clear
Wien->today: very heavy rain, tomorrow: light rain
[/code]

Il codice appena scritto viene eseguito sequenzialmente e sul main thread, per rendere la computazione asincrona, ci basterà chiamare il metodo subscribeOn su entrambi gli Observable.
[code lang=”java”]
Observable<String> restCalls =
allCities.doOnNext(debugString).flatMap(
cityName -> Observable.zip(
getCurrentWeather(cityName)
.subscribeOn(Schedulers.io())
.doOnNext(debugString),
getTomorrowWeather(cityName)
.subscribeOn(Schedulers.io())
.doOnNext(debugForecast),
(today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow).map(weather -> cityName + "->" + weather)
);
restCalls.subscribe(debugString);
[/code]
Queste piccole modifiche fanno si che ogni elemento, dopo essere stato emesso sul main thread, generi due nuovi Observable, uno per chiamata, su due altri thread. I thread si coordinano tra di loro e quando sono disponibili entrambe le risposte i due elementi vengono zippati e mostrati su console. In questo esempio usiamo la lambda debugForecast che permette di distinguere la chiamata per la previsione rispetto quella relativa al meteo corrente:
[code]
main : Zurich
main : London
main : Paris
main : Berlin
main : Roma
main : Madrid
main : Wien
RxCachedThreadScheduler-5 : light rain
RxCachedThreadScheduler-3 : Sky is Clear
RxCachedThreadScheduler-8 : light rain (forecast)
RxCachedThreadScheduler-7 : broken clouds
RxCachedThreadScheduler-11: few clouds
RxCachedThreadScheduler-4 : sky is clear (forecast)
RxCachedThreadScheduler-6 : light rain (forecast)
RxCachedThreadScheduler-12: sky is clear (forecast)
RxCachedThreadScheduler-7 : Berlin->today: broken clouds, tomorrow: light rain
RxCachedThreadScheduler-9 : light rain
RxCachedThreadScheduler-1 : overcast clouds
RxCachedThreadScheduler-13: very heavy rain
RxCachedThreadScheduler-14: light rain (forecast)
RxCachedThreadScheduler-7 : London->today: Sky is Clear, tomorrow: sky is clear
RxCachedThreadScheduler-10: heavy intensity rain (forecast)
RxCachedThreadScheduler-7 : Paris->today: light rain, tomorrow: light rain
RxCachedThreadScheduler-7 : Roma->today: light rain, tomorrow: heavy intensity rain
RxCachedThreadScheduler-7 : Madrid->today: few clouds, tomorrow: sky is clear
RxCachedThreadScheduler-7 : Wien->today: very heavy rain, tomorrow: light rain
RxCachedThreadScheduler-2 : light snow (forecast)
RxCachedThreadScheduler-2 : Zurich->today: overcast clouds, tomorrow: light snow
[/code]

E’ interessante esaminare il risultato perché si può notare innanzitutto che vengono usati tutti i thread a disposizione dello Scheduler. Inoltre, l’ordine di arrivo alla fine della catena non coincide più con quello di partenza: man mano che sono disponibili dei risultati vengono stampati dall’Observer.

Ma forse la cosa più importante da sottolineare è che, un passo alla volta, o meglio, un post alla volta, siamo arrivati a gestire due flussi di chiamate asincrone verso due servizi diversi per poi ricombinarle in un unico stream senza mai gestire una callback. Non solo, abbiamo usato davvero pochissime righe.

Il punto di forza di RxJava è fondamentalmente questo: possiamo effettuare composizioni di esecuzioni asincrone senza entrare in un groviglio di callback, soprattutto quando componiamo più livelli asincroni assieme. Ad esempio, se volessimo gestire tutto subito in maniera asincrona, ci basterebbe modificare solo la prima riga aggiungendo un subscribeOn>:
[code lang=”java”]
Observable<String> restCalls = allCities.subscribeOn(Schedulers.newThread()).doOnNext(debugString).flatMap(
cityName -> Observable.zip(
getCurrentWeather(cityName).subscribeOn(Schedulers.io()).doOnNext(debugString),
getTomorrowWeather(cityName).subscribeOn(Schedulers.io()).doOnNext(debugForecast),
(today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow)
.map(weather -> cityName + "->" + weather)
);
restCalls.subscribe(debugString);
[/code]

RxJava? Tutto ancora da scoprire!

In questa serie di post abbiamo giusto scalfito RxJava esponendone alcuni dei concetti principali ma ci sono ancora molte cose da vedere, oltre, ovviamente, alla lunghissima lista di operatori disponibili.

Per chi lavora in Android, inoltre, esiste RxAndroid che integra RxJava con le API di Android. In particolare, rende disponibile lo Scheduler AndroidSchedulers.mainThread() che permette di riportare l’Observer sul thread principale, e quindi aggiornare l’interfaccia utente, dopo aver lavorato in modalità asincrona.

RxJava ha un’ottima documentazione, ma richiede un po’ di tempo per essere compreso a fondo. Di contro, una volta digerito, permette di costruire flussi complessi di dati con una semplicità che difficilmente si trova in altre librerie.