RxJava – gestire i thread

RxJava – gestire i thread

In questo ultimo post dedicato ai concetti fondamentali di RxJava vedremo come gestire i thread. Faremo prima degli esempi minimali in modo da comprendere i meccanismi sottostanti e poi andremo ad utilizzare le conoscenze acquisite nell’esempio della volta precedente.

Oramai siamo affezionati alla nostra lista di città che esponiamo come Observable, quindi la useremo anche negli esempi di base:

Observable<String> allCities = Observable.just("Zurich", "London", "Paris", "Berlin", "Roma", "Madrid", "Wien");

in più introduciamo un semplicissimo log sulla console che ci servirà per sapere qual è il thread corrente che sta usando l’Observable:

  public static void logStringWithThread(String string) {
    System.out.printf("%-26.26s: %-10.10s%n", Thread.currentThread().getName(), string);
  }

  public static Action1<? super String> debugString = string -> logStringWithThread(string);

Ci sono gli ingredienti necessari per partire per cui.. azione! 🙂

Chi ha detto asincrono?

Nel primo post su RxJava abbiamo parlato dell’operatore doOnNext, utile per eseguire un’azione sull’Observable ogni volta che emette un elemento. Noi lo sfrutteremo per stampare la stringa corrente e conoscere il thread in esecuzione attraverso la funzione debugString, in più eseguiremo due banali trasformazioni sulla lista di città:

allCities
    .doOnNext(debugString)
    .map(s -> s + "-")
    .doOnNext(debugString)
    .map(s -> "-" + s)
    .subscribe(debugString);

Il risultato è riportato qui sotto.

main                      : Zurich    
main                      : Zurich-   
main                      : -Zurich-  
main                      : London    
main                      : London-   
main                      : -London-  
main                      : Paris     
main                      : Paris-  
....

I log ci indicano che, a partire dalla generazione dell’elemento fino ad arrivare all’Observer, siamo sempre nel main thread. E’ vero che RxJava si esprime al suo meglio quando lavoriamo in modalità asincrona, ma nulla indica che questo sia il comportamento di default. Abbiamo visto proprio in questo esempio che l’Observable inizia ad emettere elementi usando il thread che lo genera, in questo caso il principale.

Non dobbiamo però neanche fare l’assunzione opposta, cioè che un’esecuzione sincrona resti sempre tale. Guardiamo questo esempio in cui chiediamo all’Observable di ritardare di mezzo secondo l’emissione di un elemento tramite l’operatore delay:

allCities
    .doOnNext(debugString)
    .delay(500, TimeUnit.MILLISECONDS)
    .map(s -> s + "-")
    .doOnNext(debugString)
    .map(s -> "-" + s)
    .subscribe(debugString);

il risultato dell’esecuzione è il seguente.

main                      : Zurich    
main                      : London    
main                      : Paris     
main                      : Berlin    
main                      : Roma      
main                      : Madrid    
main                      : Wien      
RxComputationThreadPool-1 : Zurich-   
RxComputationThreadPool-1 : -Zurich-  
RxComputationThreadPool-1 : London-   
RxComputationThreadPool-1 : -London-  
...

Come si vede, l’operatore delay fa cambiare thread usandone uno preso da un thread pool definito da RxJava. Non solo l’Observable diventa asincrono ma anche il suo Observer. Dato che questo codice viene eseguito in una semplice applicazione da riga di comando, se non mettiamo uno sleep come ultima riga di codice, rischiamo di non vedere il risultato perché l’applicazione termina prima.

Quindi, come complemento dell’indicazione di prima, possiamo dire questo: è sempre bene gestire l’Observable in modalità asincrona, anche se non lo stiamo forzando esplicitamente.

Cambiamo thread!

Il primo metodo che possiamo usare per cambiare il thread dell’Observable è il metodo subscribeOn: ci consente di passare in input uno Scheduler, cioè, secondo la documentazione, un oggetto capace di eseguire un’unità di lavoro. Non vogliamo entrare nel dettaglio dello Scheduler, ci basta sapere che usandolo andiamo a “pescare” un thread da uno specifico pool e su questo eseguiamo l’Observable. RxJava mette a disposizione alcuni Scheduler predefiniti. Per il nostro primo esempio useremo Schedulers.computation che è pensato per svolgere lavori di calcolo. Se vogliamo gestire attività input/output useremo lo Scheduler Schedulers.io.

Cambiamo, quindi, il codice in questo modo:

allCities
    .subscribeOn(Schedulers.computation()) 
    .doOnNext(debugString)
    .map(s -> s+ "-") 
    .doOnNext(debugString)
    .map(s -> "-" + s)
    .subscribe(debugString);

e osserviamo il seguente risultato:

RxComputationThreadPool-1 : Zurich    
RxComputationThreadPool-1 : Zurich-   
RxComputationThreadPool-1 : -Zurich-  
RxComputationThreadPool-1 : London    
RxComputationThreadPool-1 : London-   
RxComputationThreadPool-1 : -London-  
RxComputationThreadPool-1 : Paris     
...

L’Observable viene gestito in maniera asincrona dall’inizio alla fine utilizzando un unico thread preso dal thread pool di Schedulers.computation. Due cose importanti da tenere presenti su questo metodo. La prima è che se lo si usa più volte cambiando Scheduler, le chiamate successive non hanno effetto. Quindi se, dopo il primo map, scrivessimo subscribeOn(Schedulers.io()) questa istruzione verrebbe ignorata. Seconda cosa: subscribeOn può essere invocato in qualsiasi punto della catena, il suo effetto non cambia.

Complementare a subscribeOn è il metodo observeOn che modifica il thread su cui l’Observable sta lavorando nel punto in cui viene inserito e ci permette di cambiare Scheduler, anche più volte. Vediamolo in azione:

allCities
    .doOnNext(debugString)
    .observeOn(Schedulers.computation())
    .map(s -> "-" + s)
    .doOnNext(debugString)
    .map(s -> s + "-")
    .subscribe(debugString);

Dopo aver emesso l’elemento e averlo stampato la prima volta in console, cambiamo thread spostandolo sullo Schedulers.computation e continuiamo il flusso dati ottenendo questo output:

main                      : Zurich    
main                      : London    
main                      : Paris     
main                      : Berlin    
main                      : Roma      
RxComputationThreadPool-3 : -Zurich   
main                      : Madrid    
RxComputationThreadPool-3 : -Zurich-  
main                      : Wien      
RxComputationThreadPool-3 : -London   
...

Prima ancora che il thread principale termini, l’esecuzione asincrona ha inizio e quindi i due thread stanno lavorando in concorrenza.

Possiamo usare subscribeOn e observeOn assieme? Certamente, facciamo un esempio in cui usiamo tre thread diversi usando observeOn due volte.

allCities
   .subscribeOn(Schedulers.computation())
   .doOnNext(debugString)
   .map(s -> "-" + s)
   .observeOn(Schedulers.io())
   .doOnNext(debugString)
   .map(s -> s + "-")
   .observeOn(Schedulers.computation())
   .subscribe(debugString);
RxComputationThreadPool-4 : Zurich    
RxComputationThreadPool-4 : London    
RxComputationThreadPool-4 : Paris     
RxComputationThreadPool-4 : Berlin    
RxComputationThreadPool-4 : Roma      
RxCachedThreadScheduler-1 : -Zurich   
RxComputationThreadPool-4 : Madrid    
RxComputationThreadPool-3 : -Zurich-  
RxCachedThreadScheduler-1 : -London   
RxComputationThreadPool-4 : Wien      
RxComputationThreadPool-3 : -London-  
...

Senza dare alcuna parametrizzazione aggiuntiva, l’effetto che otteniamo è quello di avere tre thread che lavorano in pipeline: ogni città passa in ordine attraverso i thread gestiti dagli Scheduler. Notiamo che da Scheduler.computation vengono estratti due thread.

Combinare più chiamate REST

E’ il momento di riprendere l’esempio del post precedente, l’interrogazione del servizio per il meteo. Oltre a conoscere le condizioni attuali, è possibile ottenere ad un altro indirizzo la previsione per le giornate successive usando un url in questa forma http://api.openweathermap.org/data/2.5/forecast/daily?q=Roma&mode=json&units=metric&cnt=2.

Non riportiamo qui i dettagli del codice che si possono consultare su GitHub, ma abbiamo introdotto il metodo
public static Observable<String> getTomorrowWeather(String cityName) il cui compito è appunto quello di invocare il servizio e restituire la previsione per domani.

Abbiamo ora due servizi disponibili, ciascuno incapsulato in un Observable. Usando il metodo zip introdotto nel primo post, sappiamo come concatenare le risposte che arrivano dal server. Adoperando, invece, la stessa tecnica vista nel secondo post, siamo in grado di aggiungere il nome della città e trasformare il tutto in un Observable di stringhe:

Observable<String> restCalls = allCities.flatMap(
    cityName -> Observable.zip(
        getCurrentWeather(cityName),
        getTomorrowWeather(cityName),
        (today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow)
           .map(weather -> cityName + "->" + weather)
    );
   
restCalls.subscribe(System.out::println);

ottenendo qualcosa di simile a questo:

Zurich->today: overcast clouds, tomorrow: light snow
London->today: Sky is Clear, tomorrow: sky is clear
Paris->today: mist, tomorrow: light rain
Berlin->today: light intensity drizzle, tomorrow: light rain
Roma->today: proximity thunderstorm, tomorrow: heavy intensity rain
Madrid->today: few clouds, tomorrow: sky is clear
Wien->today: very heavy rain, tomorrow: light rain

Il codice appena scritto viene eseguito sequenzialmente e sul main thread, per rendere la computazione asincrona, ci basterà chiamare il metodo subscribeOn su entrambi gli Observable.

Observable<String> restCalls = 
allCities.doOnNext(debugString).flatMap(
    cityName -> Observable.zip(
        getCurrentWeather(cityName)
               .subscribeOn(Schedulers.io())
               .doOnNext(debugString),
        getTomorrowWeather(cityName)
               .subscribeOn(Schedulers.io())
               .doOnNext(debugForecast),
        (today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow).map(weather -> cityName + "->" + weather)
    );

restCalls.subscribe(debugString);

Queste piccole modifiche fanno si che ogni elemento, dopo essere stato emesso sul main thread, generi due nuovi Observable, uno per chiamata, su due altri thread. I thread si coordinano tra di loro e quando sono disponibili entrambe le risposte i due elementi vengono zippati e mostrati su console. In questo esempio usiamo la lambda debugForecast che permette di distinguere la chiamata per la previsione rispetto quella relativa al meteo corrente:

main                      : Zurich
main                      : London
main                      : Paris
main                      : Berlin
main                      : Roma
main                      : Madrid
main                      : Wien
RxCachedThreadScheduler-5 : light rain      
RxCachedThreadScheduler-3 : Sky is Clear                           
RxCachedThreadScheduler-8 : light rain (forecast)           
RxCachedThreadScheduler-7 : broken clouds                   
RxCachedThreadScheduler-11: few clouds                               
RxCachedThreadScheduler-4 : sky is clear (forecast)                    
RxCachedThreadScheduler-6 : light rain (forecast)                 
RxCachedThreadScheduler-12: sky is clear (forecast)                 
RxCachedThreadScheduler-7 : Berlin->today: broken clouds, tomorrow: light rain
RxCachedThreadScheduler-9 : light rain
RxCachedThreadScheduler-1 : overcast clouds
RxCachedThreadScheduler-13: very heavy rain
RxCachedThreadScheduler-14: light rain (forecast)
RxCachedThreadScheduler-7 : London->today: Sky is Clear, tomorrow: sky is clear
RxCachedThreadScheduler-10: heavy intensity rain (forecast)
RxCachedThreadScheduler-7 : Paris->today: light rain, tomorrow: light rain
RxCachedThreadScheduler-7 : Roma->today: light rain, tomorrow: heavy intensity rain
RxCachedThreadScheduler-7 : Madrid->today: few clouds, tomorrow: sky is clear
RxCachedThreadScheduler-7 : Wien->today: very heavy rain, tomorrow: light rain
RxCachedThreadScheduler-2 : light snow (forecast)
RxCachedThreadScheduler-2 : Zurich->today: overcast clouds, tomorrow: light snow

E’ interessante esaminare il risultato perché si può notare innanzitutto che vengono usati tutti i thread a disposizione dello Scheduler. Inoltre, l’ordine di arrivo alla fine della catena non coincide più con quello di partenza: man mano che sono disponibili dei risultati vengono stampati dall’Observer.

Ma forse la cosa più importante da sottolineare è che, un passo alla volta, o meglio, un post alla volta, siamo arrivati a gestire due flussi di chiamate asincrone verso due servizi diversi per poi ricombinarle in un unico stream senza mai gestire una callback. Non solo, abbiamo usato davvero pochissime righe.

Il punto di forza di RxJava è fondamentalmente questo: possiamo effettuare composizioni di esecuzioni asincrone senza entrare in un groviglio di callback, soprattutto quando componiamo più livelli asincroni assieme. Ad esempio, se volessimo gestire tutto subito in maniera asincrona, ci basterebbe modificare solo la prima riga aggiungendo un subscribeOn>:

Observable<String> restCalls = allCities.subscribeOn(Schedulers.newThread()).doOnNext(debugString).flatMap(
        cityName -> Observable.zip(
            getCurrentWeather(cityName).subscribeOn(Schedulers.io()).doOnNext(debugString),
            getTomorrowWeather(cityName).subscribeOn(Schedulers.io()).doOnNext(debugForecast),
            (today, tomorrow) -> "today: " + today + ", tomorrow: " + tomorrow)
               .map(weather -> cityName + "->" + weather)
        );

   
    restCalls.subscribe(debugString);

RxJava? Tutto ancora da scoprire!

In questa serie di post abbiamo giusto scalfito RxJava esponendone alcuni dei concetti principali ma ci sono ancora molte cose da vedere, oltre, ovviamente, alla lunghissima lista di operatori disponibili.

Per chi lavora in Android, inoltre, esiste RxAndroid che integra RxJava con le API di Android. In particolare, rende disponibile lo Scheduler AndroidSchedulers.mainThread() che permette di riportare l’Observer sul thread principale, e quindi aggiornare l’interfaccia utente, dopo aver lavorato in modalità asincrona.

RxJava ha un’ottima documentazione, ma richiede un po’ di tempo per essere compreso a fondo. Di contro, una volta digerito, permette di costruire flussi complessi di dati con una semplicità che difficilmente si trova in altre librerie.

Giampaolo Trapasso

Sono laureato in Informatica e attualmente lavoro come Software Engineer in Radicalbit. Mi diverto a programmare usando Java e Scala, Akka, RxJava e Cassandra. Qui mio modesto contributo su StackOverflow e il mio account su GitHub