Se c’è una tecnologia che periodicamente incrocia la mia strada questa è sicuramente RxJava dove Rx sta per Reactive Extensions. Forse non avrete sentito parlare direttamente di RxJava ma di RxJS, RxScala, RxGroovy e così via. Tutti porting derivanti dalla libreria originale Rx in .NET che a sua volta possiamo inserire nel grande contenitore del Functional Reactive Programming. In questo post introdurremo RxJava mettendo in evidenza i meccanismi di base. La documentazione di RxJava è già abbastanza esaustiva di per se (Rx è conosciuta per i marble diagrams), quindi ci concentreremo su qualche esempio che metta assieme spunti differenti.
Ma cos’è RxJava?
Per rispondere bene dovremmo capire il significato di reactive e forse anche dopo aver letto la voce di Wikipedia sul reactive programming probabilmente non avremmo le idee chiare su RxJava. Quindi in prima approssimazione lasciamo perdere questo aggettivo e diciamo cosa fa RxJava per noi. Questa libreria ci offre un modello di programmazione dove possiamo lavorare con gli eventi derivanti dalla UI o da chiamate asincrone alla stessa maniera con cui operiamo sulle collezioni e sugli . Possiamo trasformare gli eventi usando funzioni come map, flatmap, filter, ecc. Però al posto di costruire del codice che risponde agli eventi (leggi callback), trattiamo questi eventi come un flusso che viene modificato e gestito man mano che si genera ad opera della classe Observable
. Cerchiamo di inquadrarla meglio. Quando ci serve avere un oggetto da un metodo, definiamo il nostro metodo in questo modo:
T getObject()
mentre se ci serve una collezione scriveremo
IterablegetCollection()
Ora se ci spostiamo nel campo dell’esecuzione asincrona e vogliamo gestire un oggetto che sarà disponibile successivamente, andiamo a realizzare un metodo con questo stile:
FuturegetObject()
Ci manca quindi l’ultima combinazione: una collezione i cui elementi saranno disponibili in futuro ma non necessariamente tutti nello stesso istante. La soluzione di RxJava per questo caso è proprio la classe Observable:
ObservablegetCollection()
Un Observable è un oggetto che emette zero o più elementi per poi terminare con successo oppure durante il flusso degli elementi si interrompe a causa di un errore. Dall’altra parte abbiamo gli osservatori di tipo Observer
che consumano l’elemento e gestiscono la terminazione dell’Observable (oppure il suo errore). Per gestire correttamente un Observable, un Observer implementa i metodi:
onNext
onComplete
onError
Questo pattern è molto simile all’observer “classico” in quanto lo estende ma ci sono delle differenze importanti. In generale l’Observable non emette elementi se non c’è alcun Observer (ma è possibile realizzare anche hot Observable dove questa regola non vale). Inoltre, l’Observer viene notificato quanto l’Observable smette di produrre elementi perché termina o perché fallisce. Questo implica anche che non è necessario rimuovere l’Observer dall’Observable quanto quest’ultimo è stato consumato perché avviene automaticamente.
Talk is cheap, show me the code
Ora che abbiamo chiarito i concetti fondamentali siamo pronti a sperimentare. In questa prima parte useremo sempre il thread principale, nonostante la “predisposizione” degli Observable alla gestione asincrona, nulla vieta di usare gli Observable solo in modalità sincrona.
Definire un Observable è molto semplice: qualsiasi collezione può essere trasformata in Observable:
ListcityList = new ArrayList<>(); cityList.add("Berlin"); cityList.add("Roma"); cityList.add("Madrid"); cityList.add("Wien"); Observable cities = Observable.from(cityList);
RxJava mette a disposizione anche dei metodi di utilità per semplificarci la vita, come il metodo just
che costruisce un Observable partendo da una lista di oggetti. Un modo alternativo di costruire un Observable è questo:
ObservablemoreCities = Observable.just("Zurich", "London", "Paris");
Possiamo anche generare Observable combinandone di esistenti. Ad esempio, posso fondere i due Observable precedenti con l’operatore concatWith
:
ObservableallCities = cities.concatWith(moreCities);
Cosa fa esattamente questo metodo? Come dicevamo prima, la documentazione è fatta molto bene, quindi riporto qui sotto il marble diagram, cioè il diagramma a biglie, per il metodo concatWith.
L’immagine spiega molto meglio di come possiamo fare noi a parole: la freccia a destra indica l’avanzamento del tempo, abbiamo due Observable quindi due frecce. Il risultato del metodo è un nuovo Observable composto dagli elementi del primo seguiti dagli elementi del secondo mantenendo l’ordine. Gli Observable indicati terminano nell’istante indicato dalla stanghetta verticale.
Tutta la documentazione di RxJava è realizzata in questa maniera, la difficoltà nell’uso di questa libreria non starà quindi nel capire come funziona un operatore, ma piuttosto quale tra le decine di operatori presenti è quello giusto per quello che dobbiamo fare 🙂 .
L’Observable è pronto, implementiamo l’interfaccia Observer per fare qualcosa con la lista delle città: ci servono tre metodi.
Observertraveller = new Observer () { @Override public void onCompleted() { System.out.println("My trip is finished"); } @Override public void onError(Throwable e) { System.out.println("I won't complete my trip!"); } @Override public void onNext(String t) { System.out.println("I've just visited " + t); } };
L’unica cosa che resta ora da fare è associare l’Observable al suo Observer tramite il metodo subscribe
allCities.subscribe(traveller);
Il risultato prodotto è quello riportato qui sotto
I've just visited Zurich I've just visited London I've just visited Paris I've just visited Berlin I've just visited Roma I've just visited Madrid I've just visited Wien My trip is finished
Se non vogliamo costruire un Observer, possiamo anche limitarci a passare il comportamento che vogliamo applicare solo nel caso onNext: basta implementare l’interfaccia Action1
Action1weather = new Action1 (){ @Override public void call(String city) { System.out.println("The weather is sunny in " + city); } }; allCities.subscribe(weather);
per ottenere questo risultato:
The weather is sunny in Zurich The weather is sunny in London The weather is sunny in Paris The weather is sunny in Berlin The weather is sunny in Roma The weather is sunny in Madrid The weather is sunny in Wien
Per fortuna con le di Java 8, possiamo scrivere tutto in maniera più compatta e risparmiare un sacco di codice.
allCities.subscribe(city -> System.out.println("The weather is sunny in " + city));
Un po’ di trasformazioni
Abbiamo visto le due operazioni indispensabili: la creazione di un Observable e la sottoscrizione di un Observer. Prima che gli elementi arrivino all’Observer possiamo applicare gli operatori che vogliamo in un modo del tutto simile a quelli che applichiamo su di uno . Facciamo un piccolo esempio:
allCities.map(city -> city.toUpperCase()) .filter(city -> city.length() > 5) .take(3) .subscribe(city -> System.out.println("I LOVE " + city));
Guardiamo in dettaglio un operatore alla volta, il metodo map
trasforma in maiuscolo gli elementi emessi dall’Observable. Dell’Observable risultante non prendiamo tutti gli elementi ma solo quelli che soddisfano la condizione passata a filter
, cioè solo le città con un nome più lungo di cinque caratteri. Infine, non usiamo tutte le stringhe a disposizione ma solo le prime tre e finalmente raggiungono l’Observer che li stampa su console ottenendo questo risultato:
I LOVE BERLIN I LOVE MADRID I LOVE WIEN
Facciamo un esempio diverso in modo da introdurre altri operatori:
cities.zipWith(moreCities, (String s1, String s2) -> "From " + s1 + " to " + s2) .doOnNext(s -> System.out.println("#Debug: "+ s)) .map(s -> s.toLowerCase()) .subscribe(System.out::println);
Anche il metodo zipWith
unisce due Observable, ma a differenza di concat, prende gli elementi emessi da due Observable e li combina a due a due secondo una funzione passata come parametro. Qui usiamo zipWith per generare una nuova stringa che comprende due città. Si noti che siccome cities
ha quattro elementi mentre moreCities
ne ha tre, il risultato sarà lungo quanto il più corto dei due Observable.
Il metodo doOnNext
consente di applicare un’azione intermedia sull’Observable, nel nostro caso lo usiamo per debuggare su console il risultato del metodo zip. Eseguendo questo esempio otteniamo quindi:
#Debug: From Zurich to Berlin from zurich to berlin #Debug: From London to Roma from london to roma #Debug: From Paris to Madrid from paris to madrid
Non finisce qui
Siamo giunti al termine di questa prima parte su RxJava. Abbiamo introdotto la classe Observable e probabilmente non avrete incontrato grosse difficoltà a seguire gli esempi fatti fino qui se usate già da un po’ (o quelli di ). In effetti, adoperando RxJava solo in modalità sincrona, non ho trovato molte differenze concettuali con gli stream di Java, tanto da farmi sembrare RxJava solo una versione molto più ricca di quanto avevo già a disposizione. La vera forza di RxJava sta però nel modo in cui tratta Observable e Observer che si trovano su thread diversi, rendendo molto più semplice la programmazione asincrona. Le cose si faranno più interessanti nei prossimi post, intanto potete provare il codice di questo articolo su .