RxJava: trasformare una chiamata REST in un Observable

Nel primo post su RxJava abbiamo esaminato i due componenti essenziali di questo framework: Observable e Observer. In tutti gli esempi l’Observable era generato usando una lista di oggetti. Ora vedremo, invece, come sia possibile costruire un Observable che incapsula una chiamata REST e integrarlo nella nostra catena di operatori, uno scenario molto comune soprattutto scrivendo un’applicazione Android.

Nell’articolo precedente utilizzavamo un elenco di città e ne stampavamo in console il nome e la finta situazione meteo. Adesso otterremo i dati attuali utilizzando http://openweathermap.org/api, un sito che offre delle semplici API REST per interrogare le condizioni atmosferiche attuali e future. Per costruire il client ci affideremo Jersey e deserializzeremo la risposta JSON utilizzando Gson, una libreria di cui abbiamo già parlato in passato. Un’alternativa che sostituisce entrambi, e che sicuramente vale la pena di approfondire in un prossimo post, è Retrofit

Dalla documentazione delle API si ricava che per conoscere le condizioni meteo correnti di una città come ad esempio Londra, l’url da invocare è il seguente http://api.openweathermap.org/data/2.5/weather?q=London e restituisce questa risposta.

{  
  "coord":{  
    "lon":-0.13,
    "lat":51.51
  },
  "sys":{  
    "type":1,
    "id":5091,
    "message":0.1016,
    "country":"GB",
    "sunrise":1417592825,
    "sunset":1417622031
  },
  "weather":[  
    {  
      "id":801,
      "main":"Clouds",
      "description":"few clouds",
      "icon":"02d"
    }
  ],
  "base":"cmc stations",
  "main":{  
    "temp":279.66,
    "pressure":1024,
    "humidity":65,
    "temp_min":278.15,
    "temp_max":281.15
  },
  "wind":{  
    "speed":5.7,
    "deg":10
  },
  "clouds":{  
    "all":20
  },
  "dt":1417605604,
  "id":2643743,
  "name":"London",
  "cod":200
}

Di tutte queste informazioni, a noi basta estrarre il valore della chiave description e concatenarlo al nome della città.

La chiamata nell’Observer

Utilizzeremo l’Observable della puntata precedente, quello generato dalla lista di città. La prima implementazione che possiamo pensare di realizzare è quella in cui scriviamo tutta la logica della chiamata REST nel metodo subscribe e quindi nell’Observer.

Observable<String> allCities = Observable.just("Zurich", "London", "Paris", "Berlin", "Roma", "Madrid", "Wien");

allCities.subscribe(cityName -> {
  String url = BASE_URL + cityName;

  Client client = Client.create();
  WebResource webResource = client.resource(url);

  ClientResponse response = 
       webResource.accept("application/json").get(ClientResponse.class);

  if (response.getStatus() != Status.OK.getStatusCode()) {
 	throw new RuntimeException("Failed: HTTP error code : " + response.getStatus());
  }

  String output = response.getEntity(String.class);

  JsonElement json = new JsonParser().parse(output);

  String weather = json.getAsJsonObject().get("weather")
                       .getAsJsonArray().get(0)
                       .getAsJsonObject().get("description")
                       .getAsString();
			
   System.out.println(cityName + "-->" + weather);
});

Senza voler entrare troppo nel merito di Jersey, anche se quanto scritto è abbastanza semplice, ci basta sapere che dalla riga 4 alla 15 gestiamo l’invocazione del metodo GET sul protocollo HTTP. Dalla riga 18 usiamo Gson, invece, per fare il parsing della risposta: stiamo attraversando l’albero degli elementi per arrivare ad estrarre la stringa associata alla chiave description. L’ultima riga banalmente stampa in console il risultato. Il tutto è infilato dentro un’espressione lambda di Java 8 che permette di gestire il caso onNext dell’Observable. Per rendere l’esempio più compatto non stiamo definendo un Observer intero ma solo cosa fare quando l’Observable emette un elemento; stiamo tralasciando il caso di terminazione e il caso di errore.

Il risultato dell’esecuzione è il seguente.

Zurich-->overcast clouds
London-->proximity shower rain
Paris-->mist
Berlin-->broken clouds
Roma-->scattered clouds
Madrid-->Sky is Clear
Wien-->very heavy rain

La chiamata nell’Observable

In effetti potremmo anche accontentarci della soluzione che abbiamo se non ci interessasse gestire in maniera più trasparente gli errori, avere una soluzione più modulare, utilizzare al meglio il multithreading o capire un po’ meglio RxJava 🙂 . Quindi dimentichiamo in fretta quanto abbiamo scritto fino qui e costruiamo un Observable che dato un nome di città ci restituisca il suo meteo attuale attraverso il servizio REST. Useremo poi questo nuovo oggetto all’interno dell’Observable allCities per ottenere lo stesso risultato.

In effetti RxJava ci permette di creare un Observable utilizzando praticamente qualsiasi cosa. Oltre ad alcuni metodi come just, otteniamo il massimo della flessibilità usando Observable.create. Questo metodo prende come parametro un oggetto che implementa l’interfaccia Observable.OnSubscribe. Come è fatto questo OnSubscribe? E’ un “alias” per l’interfaccia Action1<Subscriber<? super T>>. Cos’è invece Subscriber? E’ una classe astratta che implementa Observer.

Ci si può perdere un po’ in questo giro, e in effetti chi scrive ha dovuto andare avanti e indietro lungo le definizioni per venirne fuori. Ma senza badare troppo a tutti i tipi visti, l’unica cosa davvero importante da tenere presente è che dobbiamo definire un’istanza di OnSubscribe con una lambda e che nel nostro caso prende in pasto un Subscriber<String> senza restituire niente. A questo Subscriber dobbiamo passare gli elementi che vogliamo emettere in quanto Observable e terminare, oppure indicare l’errore. Definita questa funzione la passiamo al metodo Observable.create, fine del lavoro. Vediamolo scritto in codice.

public static Observable<String> getCurrentWeather(final String cityName) {

  Observable.OnSubscribe<String> onSubscribe = subscriber -> {

    try {
      String url = BASE_URL + cityName;

      Client client = Client.create();
      WebResource webResource = client.resource(url);

      ClientResponse response = webResource.accept("application/json").get(ClientResponse.class);

      if (response.getStatus() != Status.OK.getStatusCode()) {
        throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());
      }

      String output = response.getEntity(String.class);

      JsonElement json = new JsonParser().parse(output);

      String weather = json.getAsJsonObject().get("weather")
                           .getAsJsonArray().get(0)
                           .getAsJsonObject().get("description")
                           .getAsString();

      subscriber.onNext(weather);
      subscriber.onCompleted();
    } catch (Exception e) {
      subscriber.onError(e);
    }
  };

  return Observable.create(onSubscribe);
}

Commentiamo un po’ quanto scritto. Il nostro metodo è composto da due istruzioni: nella prima creiamo la lambda onSubscribe, nella seconda, a riga 33, creiamo l’Observable usando la lambda stessa. Esaminiamo più in dettaglio la costruzione di onSubscribe: quello che si nota a colpo d’occhio è che in pratica abbiamo traslocato il codice scritto in precedenza, introducendo però le tre chiamate canoniche al nostro subscriber. Infatti, dopo aver ottenuto la stringa che ci interessa dal nostro servizio REST, chiamiamo subscriber.onNext seguito subito da subscriber.onCompleted. In caso di qualsiasi eccezione invocheremo subscriber.onError.

Per testare che l’Observable creato funzioni, ci basterà istanziarne uno e sottoscriverlo come in questo esempio

getCurrentWeather("London").subscribe(System.out::println);

Torniamo al problema di partenza, abbiamo la lista di città, abbiamo l’Observable che incapsula la chiamata REST, come mettiamo assieme i pezzi? Semplicemente con questo codice:

Observable<String> allWeathers = allCities.flatMap(city -> getCurrentWeather(city));
allWeathers.subscribe(System.out::println);

L’operatore che usiamo non è map ma flatMap, infatti se usassimo map otterremmo un Observable<Observable<String>>, ma così avremmo un livello di inscatolamento in più che non ci serve. Flatmap effettua lo spacchettamento per noi in modo da ritornare un Observable<String>. Il risultato ottenuto è il seguente:

overcast clouds
proximity shower rain
mist
broken clouds
scattered clouds
Sky is Clear
very heavy rain

Quanto vediamo pare funzionare ma non abbiamo il nome della città. Per sistemare le cose, una soluzione veloce è fondere l’Observable delle città con quello del meteo tramite il metodo zip che abbiamo già incontrato. Come una cerniera, unisce a coppie gli elementi di due Observable usando una funzione che decide come combinarli.

Observable.zip(allCities,
               allWeathers,
               (city, weather) -> city + "->" + weather)
          .subscribe(System.out::println);

Come è successo all’inizio del post, la soluzione più intuitiva non si è rivelata la migliore 🙂 . Possiamo ottenere lo stesso risultato con questo codice:

allCities.flatMap(city -> getCurrentWeather(city).map(weather -> city + "->" + weather))
         .subscribe(System.out::println);

Bisogna guardarlo con attenzione per capire come funziona: abbiamo messo l’operatore map all’interno della chiamata a flatmap, in questo modo abbiamo due lambda, una più esterna che ha come parametro city e una più interna che ha come parametro weather, ma che vede anche la città nel suo scope. In questo modo è possibile combinarle assieme e risolvere il problema.

Cosa viene dopo

In questo post abbiamo visto come costruire un Observable che incapsuli una chiamata REST. L’idea usata si può applicare a qualsiasi tipo di attività, purché l’Observable chiami correttamente i tre metodi dell’Observer. Un esempio interessante è quello di costruire un Observable di tweet incapsulando la classe StatusAdapter di Twitter4J.

Nella prossima parte faremo lavorare gli Observable su thread diversi e vedremo come RxJava ci semplifichi la vita in questo senso. Torneremo anche a vedere la nostra API meteo in azione facendo chiamate su servizi diversi e combinandoli assieme. Come per la volta precedente, trovate il codice utilizzato su GitHub

Giampaolo Trapasso

Sono laureato in Informatica e attualmente lavoro come Software Engineer in Databiz Srl. Mi diverto a programmare usando Java e Scala, Akka, RxJava e Cassandra. Qui mio modesto contributo su StackOverflow e il mio account su GitHub