RxJava – Observable e Observer

Se c’è una tecnologia che periodicamente incrocia la mia strada questa è sicuramente RxJava dove Rx sta per Reactive Extensions. Forse non avrete sentito parlare direttamente di RxJava ma di RxJS, RxScala, RxGroovy e così via. Tutti porting derivanti dalla libreria originale Rx in .NET che a sua volta possiamo inserire nel grande contenitore del Functional Reactive Programming. In questo post introdurremo RxJava mettendo in evidenza i meccanismi di base. La documentazione di RxJava è già abbastanza esaustiva di per se (Rx è conosciuta per i marble diagrams), quindi ci concentreremo su qualche esempio che metta assieme spunti differenti.

Ma cos’è RxJava?

Per rispondere bene dovremmo capire il significato di reactive e forse anche dopo aver letto la voce di Wikipedia sul reactive programming probabilmente non avremmo le idee chiare su RxJava. Quindi in prima approssimazione lasciamo perdere questo aggettivo e diciamo cosa fa RxJava per noi. Questa libreria ci offre un modello di programmazione dove possiamo lavorare con gli eventi derivanti dalla UI o da chiamate asincrone alla stessa maniera con cui operiamo sulle collezioni e sugli stream di Java 8. Possiamo trasformare gli eventi usando funzioni come map, flatmap, filter, ecc. Però al posto di costruire del codice che risponde agli eventi (leggi callback), trattiamo questi eventi come un flusso che viene modificato e gestito man mano che si genera ad opera della classe Observable. Cerchiamo di inquadrarla meglio. Quando ci serve avere un oggetto da un metodo, definiamo il nostro metodo in questo modo:

T getObject()

mentre se ci serve una collezione scriveremo

Iterable<T> getCollection() 

Ora se ci spostiamo nel campo dell’esecuzione asincrona e vogliamo gestire un oggetto che sarà disponibile successivamente, andiamo a realizzare un metodo con questo stile:

Future<T> getObject() 

Ci manca quindi l’ultima combinazione: una collezione i cui elementi saranno disponibili in futuro ma non necessariamente tutti nello stesso istante. La soluzione di RxJava per questo caso è proprio la classe Observable:

Observable<T> getCollection() 
La classe Observable

Un Observable è un oggetto che emette zero o più elementi per poi terminare con successo oppure durante il flusso degli elementi si interrompe a causa di un errore. Dall’altra parte abbiamo gli osservatori di tipo Observer che consumano l’elemento e gestiscono la terminazione dell’Observable (oppure il suo errore). Per gestire correttamente un Observable, un Observer implementa i metodi:

  • onNext
  • onComplete
  • onError

Questo pattern è molto simile all’observer “classico” in quanto lo estende ma ci sono delle differenze importanti. In generale l’Observable non emette elementi se non c’è alcun Observer (ma è possibile realizzare anche hot Observable dove questa regola non vale). Inoltre, l’Observer viene notificato quanto l’Observable smette di produrre elementi perché termina o perché fallisce. Questo implica anche che non è necessario rimuovere l’Observer dall’Observable quanto quest’ultimo è stato consumato perché avviene automaticamente.

Talk is cheap, show me the code

Ora che abbiamo chiarito i concetti fondamentali siamo pronti a sperimentare. In questa prima parte useremo sempre il thread principale, nonostante la “predisposizione” degli Observable alla gestione asincrona, nulla vieta di usare gli Observable solo in modalità sincrona.

Definire un Observable è molto semplice: qualsiasi collezione può essere trasformata in Observable:

List<String> cityList = new ArrayList<>();
cityList.add("Berlin");
cityList.add("Roma");
cityList.add("Madrid");
cityList.add("Wien");
		
Observable<String> cities = Observable.from(cityList);

RxJava mette a disposizione anche dei metodi di utilità per semplificarci la vita, come il metodo just che costruisce un Observable partendo da una lista di oggetti. Un modo alternativo di costruire un Observable è questo:

Observable<String> moreCities = Observable.just("Zurich", "London", "Paris");

Possiamo anche generare Observable combinandone di esistenti. Ad esempio, posso fondere i due Observable precedenti con l’operatore concatWith:

Observable<String> allCities = cities.concatWith(moreCities);

Cosa fa esattamente questo metodo? Come dicevamo prima, la documentazione è fatta molto bene, quindi riporto qui sotto il marble diagram, cioè il diagramma a biglie, per il metodo concatWith.

metodo-concat-rxjava

L’immagine spiega molto meglio di come possiamo fare noi a parole: la freccia a destra indica l’avanzamento del tempo, abbiamo due Observable quindi due frecce. Il risultato del metodo è un nuovo Observable composto dagli elementi del primo seguiti dagli elementi del secondo mantenendo l’ordine. Gli Observable indicati terminano nell’istante indicato dalla stanghetta verticale.

Tutta la documentazione di RxJava è realizzata in questa maniera, la difficoltà nell’uso di questa libreria non starà quindi nel capire come funziona un operatore, ma piuttosto quale tra le decine di operatori presenti è quello giusto per quello che dobbiamo fare 🙂 .

L’Observable è pronto, implementiamo l’interfaccia Observer per fare qualcosa con la lista delle città: ci servono tre metodi.

Observer<String> traveller = new Observer<String>() {

  @Override
  public void onCompleted() {
    System.out.println("My trip is finished");
  }

  @Override
    public void onError(Throwable e) {
    System.out.println("I won't complete my trip!");
  }

  @Override
  public void onNext(String t) {
    System.out.println("I've just visited " + t);
  }

};

L’unica cosa che resta ora da fare è associare l’Observable al suo Observer tramite il metodo subscribe

allCities.subscribe(traveller);

Il risultato prodotto è quello riportato qui sotto

I've just visited Zurich
I've just visited London
I've just visited Paris
I've just visited Berlin
I've just visited Roma
I've just visited Madrid
I've just visited Wien
My trip is finished

Se non vogliamo costruire un Observer, possiamo anche limitarci a passare il comportamento che vogliamo applicare solo nel caso onNext: basta implementare l’interfaccia Action1

Action1<String> weather = new Action1<String>(){
  @Override
  public void call(String city) {
    System.out.println("The weather is sunny in " + city);
  }
			
};

allCities.subscribe(weather);

per ottenere questo risultato:

The weather is sunny in Zurich
The weather is sunny in London
The weather is sunny in Paris
The weather is sunny in Berlin
The weather is sunny in Roma
The weather is sunny in Madrid
The weather is sunny in Wien

Per fortuna con le lambda di Java 8, possiamo scrivere tutto in maniera più compatta e risparmiare un sacco di codice.

allCities.subscribe(city -> System.out.println("The weather is sunny in " + city));

Un po’ di trasformazioni

Abbiamo visto le due operazioni indispensabili: la creazione di un Observable e la sottoscrizione di un Observer. Prima che gli elementi arrivino all’Observer possiamo applicare gli operatori che vogliamo in un modo del tutto simile a quelli che applichiamo su di uno stream di Java 8. Facciamo un piccolo esempio:

allCities.map(city -> city.toUpperCase())
         .filter(city -> city.length() > 5)
         .take(3)
         .subscribe(city -> System.out.println("I LOVE " + city));

Guardiamo in dettaglio un operatore alla volta, il metodo map trasforma in maiuscolo gli elementi emessi dall’Observable. Dell’Observable risultante non prendiamo tutti gli elementi ma solo quelli che soddisfano la condizione passata a filter, cioè solo le città con un nome più lungo di cinque caratteri. Infine, non usiamo tutte le stringhe a disposizione ma solo le prime tre e finalmente raggiungono l’Observer che li stampa su console ottenendo questo risultato:

I LOVE BERLIN
I LOVE MADRID
I LOVE WIEN

Facciamo un esempio diverso in modo da introdurre altri operatori:

cities.zipWith(moreCities, (String s1, String s2) -> "From " + s1 + " to " + s2)
      .doOnNext(s -> System.out.println("#Debug: "+ s))
      .map(s -> s.toLowerCase())
      .subscribe(System.out::println);

Anche il metodo zipWith unisce due Observable, ma a differenza di concat, prende gli elementi emessi da due Observable e li combina a due a due secondo una funzione passata come parametro. Qui usiamo zipWith per generare una nuova stringa che comprende due città. Si noti che siccome cities ha quattro elementi mentre moreCities ne ha tre, il risultato sarà lungo quanto il più corto dei due Observable.
Il metodo doOnNext consente di applicare un’azione intermedia sull’Observable, nel nostro caso lo usiamo per debuggare su console il risultato del metodo zip. Eseguendo questo esempio otteniamo quindi:

#Debug: From Zurich to Berlin
from zurich to berlin
#Debug: From London to Roma
from london to roma
#Debug: From Paris to Madrid
from paris to madrid

Non finisce qui

Siamo giunti al termine di questa prima parte su RxJava. Abbiamo introdotto la classe Observable e probabilmente non avrete incontrato grosse difficoltà a seguire gli esempi fatti fino qui se usate già da un po’ gli stream di Java 8 (o quelli di Scala). In effetti, adoperando RxJava solo in modalità sincrona, non ho trovato molte differenze concettuali con gli stream di Java, tanto da farmi sembrare RxJava solo una versione molto più ricca di quanto avevo già a disposizione. La vera forza di RxJava sta però nel modo in cui tratta Observable e Observer che si trovano su thread diversi, rendendo molto più semplice la programmazione asincrona. Le cose si faranno più interessanti nei prossimi post, intanto potete provare il codice di questo articolo su GitHub.

Giampaolo Trapasso

Sono laureato in Informatica e attualmente lavoro come Software Engineer in Radicalbit. Mi diverto a programmare usando Java e Scala, Akka, RxJava e Cassandra. Qui mio modesto contributo su StackOverflow e il mio account su GitHub

  • Che bomba! Grazie mille per questo tutorial! 🙂

    • Giampaolo Trapasso

      ti ringrazio! di RxJava si parla anche in altri due tutorial e nello screencast di Fabio. Facci sapere se ti piacciono 🙂

  • PlutoUberAlles

    Fantastico!
    Grazie mille! Spero facciate anche la seconda parte!

    • Giampaolo Trapasso

      Ciao! Si ci sono altri due post di approfondimento specifici su Rx e anche uno screencast. Li trovi cercando “RX” nella barra di ricerca. Facci sapere cosa ne pensi.
      ps. Un collega mi ha segnalato che il terzo post ha un refuso sull’output in console. Devo correggerlo.

  • Mi ci voleva proprio un’introduzione a RxJava fatta come si deve! Grazie per la condivisione 😉