RxJava – gestire i thread
In questo ultimo post dedicato ai concetti fondamentali di RxJava vedremo come gestire i thread. Faremo prima degli esempi minimali in modo da comprendere i meccanismi sottostanti e poi andremo ad utilizzare le conoscenze acquisite nell’esempio della .
Oramai siamo affezionati alla nostra lista di città che esponiamo come Observable, quindi la useremo anche negli esempi di base:
ObservableallCities = Observable.just("Zurich", "London", "Paris", "Berlin", "Roma", "Madrid", "Wien");
in più introduciamo un semplicissimo log sulla console che ci servirà per sapere qual è il thread corrente che sta usando l’Observable:
public static void logStringWithThread(String string) { System.out.printf("%-26.26s: %-10.10s%n", Thread.currentThread().getName(), string); } public static Action1 super String> debugString = string -> logStringWithThread(string);
Ci sono gli ingredienti necessari per partire per cui.. azione! 🙂
Chi ha detto asincrono?
Nel su RxJava abbiamo parlato dell’operatore doOnNext
, utile per eseguire un’azione sull’Observable ogni volta che emette un elemento. Noi lo sfrutteremo per stampare la stringa corrente e conoscere il thread in esecuzione attraverso la funzione debugString
, in più eseguiremo due banali trasformazioni sulla lista di città:
allCities .doOnNext(debugString) .map(s -> s + "-") .doOnNext(debugString) .map(s -> "-" + s) .subscribe(debugString);
Il risultato è riportato qui sotto.
main : Zurich main : Zurich- main : -Zurich- main : London main : London- main : -London- main : Paris main : Paris- ....
I log ci indicano che, a partire dalla generazione dell’elemento fino ad arrivare all’Observer, siamo sempre nel main thread. E’ vero che RxJava si esprime al suo meglio quando lavoriamo in modalità asincrona, ma nulla indica che questo sia il comportamento di default. Abbiamo visto proprio in questo esempio che l’Observable inizia ad emettere elementi usando il thread che lo genera, in questo caso il principale.
Non dobbiamo però neanche fare l’assunzione opposta, cioè che un’esecuzione sincrona resti sempre tale. Guardiamo questo esempio in cui chiediamo all’Observable di ritardare di mezzo secondo l’emissione di un elemento tramite l’operatore delay
:
allCities .doOnNext(debugString) .delay(500, TimeUnit.MILLISECONDS) .map(s -> s + "-") .doOnNext(debugString) .map(s -> "-" + s) .subscribe(debugString);
il risultato dell’esecuzione è il seguente.
main : Zurich main : London main : Paris main : Berlin main : Roma main : Madrid main : Wien RxComputationThreadPool-1 : Zurich- RxComputationThreadPool-1 : -Zurich- RxComputationThreadPool-1 : London- RxComputationThreadPool-1 : -London- ...
Come si vede, l’operatore delay fa cambiare thread usandone uno preso da un thread pool definito da RxJava. Non solo l’Observable diventa asincrono ma anche il suo Observer. Dato che questo codice viene eseguito in una semplice applicazione da riga di comando, se non mettiamo uno sleep come ultima riga di codice, rischiamo di non vedere il risultato perché l’applicazione termina prima.
Quindi, come complemento dell’indicazione di prima, possiamo dire questo: è sempre bene gestire l’Observable in modalità asincrona, anche se non lo stiamo forzando esplicitamente.
Cambiamo thread!
Il primo metodo che possiamo usare per cambiare il thread dell’Observable è il metodo subscribeOn
: ci consente di passare in input uno Scheduler
, cioè, secondo la documentazione, un oggetto capace di eseguire un’unità di lavoro. Non vogliamo entrare nel dettaglio dello Scheduler, ci basta sapere che usandolo andiamo a “pescare” un thread da uno specifico pool e su questo eseguiamo l’Observable. RxJava mette a disposizione alcuni Scheduler predefiniti. Per il nostro primo esempio useremo Schedulers.computation
che è pensato per svolgere lavori di calcolo. Se vogliamo gestire attività input/output useremo lo Scheduler Schedulers.io
.
Cambiamo, quindi, il codice in questo modo:
allCities .subscribeOn(Schedulers.computation()) .doOnNext(debugString) .map(s -> s+ "-") .doOnNext(debugString) .map(s -> "-" + s) .subscribe(debugString);
e osserviamo il seguente risultato:
RxComputationThreadPool-1 : Zurich RxComputationThreadPool-1 : Zurich- RxComputationThreadPool-1 : -Zurich- RxComputationThreadPool-1 : London RxComputationThreadPool-1 : London- RxComputationThreadPool-1 : -London- RxComputationThreadPool-1 : Paris ...
L’Observable viene gestito in maniera asincrona dall’inizio alla fine utilizzando un unico thread preso dal thread pool di Schedulers.computation
. Due cose importanti da tenere presenti su questo metodo. La prima è che se lo si usa più volte cambiando Scheduler, le chiamate successive non hanno effetto. Quindi se, dopo il primo map
, scrivessimo subscribeOn(Schedulers.io())
questa istruzione verrebbe ignorata. Seconda cosa: subscribeOn
può essere invocato in qualsiasi punto della catena, il suo effetto non cambia.
Complementare a subscribeOn
è il metodo observeOn
che modifica il thread su cui l’Observable sta lavorando nel punto in cui viene inserito e ci permette di cambiare Scheduler, anche più volte. Vediamolo in azione:
allCities .doOnNext(debugString) .observeOn(Schedulers.computation()) .map(s -> "-" + s) .doOnNext(debugString) .map(s -> s + "-") .subscribe(debugString);
Dopo aver emesso l’elemento e averlo stampato la prima volta in console, cambiamo thread spostandolo sullo Schedulers.computation
e continuiamo il flusso dati ottenendo questo output:
main : Zurich main : London main : Paris main : Berlin main : Roma RxComputationThreadPool-3 : -Zurich main : Madrid RxComputationThreadPool-3 : -Zurich- main : Wien RxComputationThreadPool-3 : -London ...
Prima ancora che il thread principale termini, l’esecuzione asincrona ha inizio e quindi i due thread stanno lavorando in concorrenza.
Possiamo usare subscribeOn
e observeOn
assieme? Certamente, facciamo un esempio in cui usiamo tre thread diversi usando observeOn
due volte.
allCities .subscribeOn(Schedulers.computation()) .doOnNext(debugString) .map(s -> "-" + s) .observeOn(Schedulers.io()) .doOnNext(debugString) .map(s -> s + "-") .observeOn(Schedulers.computation()) .subscribe(debugString);
RxComputationThreadPool-4 : Zurich RxComputationThreadPool-4 : London RxComputationThreadPool-4 : Paris RxComputationThreadPool-4 : Berlin RxComputationThreadPool-4 : Roma RxCachedThreadScheduler-1 : -Zurich RxComputationThreadPool-4 : Madrid RxComputationThreadPool-3 : -Zurich- RxCachedThreadScheduler-1 : -London RxComputationThreadPool-4 : Wien RxComputationThreadPool-3 : -London- ...
Senza dare alcuna parametrizzazione aggiuntiva, l’effetto che otteniamo è quello di avere tre thread che lavorano in pipeline: ogni città passa in ordine attraverso i thread gestiti dagli Scheduler. Notiamo che da Scheduler.computation
vengono estratti due thread.
Combinare più chiamate REST
E’ il momento di riprendere l’esempio del post precedente, l’interrogazione del servizio per il meteo. Oltre a conoscere le condizioni attuali, è possibile ottenere ad un altro indirizzo la previsione per le giornate successive usando un url in questa forma http://api.openweathermap.org/data/2.5/forecast/daily?q=Roma&mode=json&units=metric&cnt=2.
Non riportiamo qui i dettagli del codice che si possono consultare su , ma abbiamo introdotto il metodo
public static Observable
il cui compito è appunto quello di invocare il servizio e restituire la previsione per domani.
Abbiamo ora due servizi disponibili, ciascuno incapsulato in un Observable. Usando il metodo zip
introdotto nel , sappiamo come concatenare le risposte che arrivano dal server. Adoperando, invece, la stessa tecnica vista nel secondo post, siamo in grado di aggiungere il nome della città e trasformare il tutto in un Observable di stringhe:
ObservablerestCalls = allCities.flatMap( cityName -> Observable.zip( getCurrentWeather(cityName), getTomorrowWeather(cityName), (today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow) .map(weather -> cityName + "->" + weather) ); restCalls.subscribe(System.out::println);
ottenendo qualcosa di simile a questo:
Zurich->today: overcast clouds, tomorrow: light snow London->today: Sky is Clear, tomorrow: sky is clear Paris->today: mist, tomorrow: light rain Berlin->today: light intensity drizzle, tomorrow: light rain Roma->today: proximity thunderstorm, tomorrow: heavy intensity rain Madrid->today: few clouds, tomorrow: sky is clear Wien->today: very heavy rain, tomorrow: light rain
Il codice appena scritto viene eseguito sequenzialmente e sul main thread, per rendere la computazione asincrona, ci basterà chiamare il metodo subscribeOn
su entrambi gli Observable.
ObservablerestCalls = allCities.doOnNext(debugString).flatMap( cityName -> Observable.zip( getCurrentWeather(cityName) .subscribeOn(Schedulers.io()) .doOnNext(debugString), getTomorrowWeather(cityName) .subscribeOn(Schedulers.io()) .doOnNext(debugForecast), (today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow).map(weather -> cityName + "->" + weather) ); restCalls.subscribe(debugString);
Queste piccole modifiche fanno si che ogni elemento, dopo essere stato emesso sul main thread, generi due nuovi Observable, uno per chiamata, su due altri thread. I thread si coordinano tra di loro e quando sono disponibili entrambe le risposte i due elementi vengono zippati e mostrati su console. In questo esempio usiamo la lambda debugForecast
che permette di distinguere la chiamata per la previsione rispetto quella relativa al meteo corrente:
main : Zurich main : London main : Paris main : Berlin main : Roma main : Madrid main : Wien RxCachedThreadScheduler-5 : light rain RxCachedThreadScheduler-3 : Sky is Clear RxCachedThreadScheduler-8 : light rain (forecast) RxCachedThreadScheduler-7 : broken clouds RxCachedThreadScheduler-11: few clouds RxCachedThreadScheduler-4 : sky is clear (forecast) RxCachedThreadScheduler-6 : light rain (forecast) RxCachedThreadScheduler-12: sky is clear (forecast) RxCachedThreadScheduler-7 : Berlin->today: broken clouds, tomorrow: light rain RxCachedThreadScheduler-9 : light rain RxCachedThreadScheduler-1 : overcast clouds RxCachedThreadScheduler-13: very heavy rain RxCachedThreadScheduler-14: light rain (forecast) RxCachedThreadScheduler-7 : London->today: Sky is Clear, tomorrow: sky is clear RxCachedThreadScheduler-10: heavy intensity rain (forecast) RxCachedThreadScheduler-7 : Paris->today: light rain, tomorrow: light rain RxCachedThreadScheduler-7 : Roma->today: light rain, tomorrow: heavy intensity rain RxCachedThreadScheduler-7 : Madrid->today: few clouds, tomorrow: sky is clear RxCachedThreadScheduler-7 : Wien->today: very heavy rain, tomorrow: light rain RxCachedThreadScheduler-2 : light snow (forecast) RxCachedThreadScheduler-2 : Zurich->today: overcast clouds, tomorrow: light snow
E’ interessante esaminare il risultato perché si può notare innanzitutto che vengono usati tutti i thread a disposizione dello Scheduler. Inoltre, l’ordine di arrivo alla fine della catena non coincide più con quello di partenza: man mano che sono disponibili dei risultati vengono stampati dall’Observer.
Ma forse la cosa più importante da sottolineare è che, un passo alla volta, o meglio, un post alla volta, siamo arrivati a gestire due flussi di chiamate asincrone verso due servizi diversi per poi ricombinarle in un unico stream senza mai gestire una callback. Non solo, abbiamo usato davvero pochissime righe.
Il punto di forza di RxJava è fondamentalmente questo: possiamo effettuare composizioni di esecuzioni asincrone senza entrare in un groviglio di callback, soprattutto quando componiamo più livelli asincroni assieme. Ad esempio, se volessimo gestire tutto subito in maniera asincrona, ci basterebbe modificare solo la prima riga aggiungendo un subscribeOn
>:
ObservablerestCalls = allCities.subscribeOn(Schedulers.newThread()).doOnNext(debugString).flatMap( cityName -> Observable.zip( getCurrentWeather(cityName).subscribeOn(Schedulers.io()).doOnNext(debugString), getTomorrowWeather(cityName).subscribeOn(Schedulers.io()).doOnNext(debugForecast), (today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow) .map(weather -> cityName + "->" + weather) ); restCalls.subscribe(debugString);
RxJava? Tutto ancora da scoprire!
In questa serie di post abbiamo giusto scalfito RxJava esponendone alcuni dei concetti principali ma ci sono ancora molte cose da vedere, oltre, ovviamente, alla lunghissima lista di operatori disponibili.
Per chi lavora in Android, inoltre, esiste RxAndroid che integra RxJava con le API di Android. In particolare, rende disponibile lo Scheduler AndroidSchedulers.mainThread()
che permette di riportare l’Observer sul thread principale, e quindi aggiornare l’interfaccia utente, dopo aver lavorato in modalità asincrona.
RxJava ha un’ottima documentazione, ma richiede un po’ di tempo per essere compreso a fondo. Di contro, una volta digerito, permette di costruire flussi complessi di dati con una semplicità che difficilmente si trova in altre librerie.