Si parla tanto di RxJava ultimamente (), e a ragion veduta direi. Nell’ambito Java, ha spopolato soprattuto nel mondo Android perché permette di semplificare notevolmente la gestione degli eventi asincroni e scrivere un codice decisamente più moderno. Mi sono sempre chiesto se RxJava fosse altrettanto utile negli applicativi web, dove la “reattività” tra client e server (o viceversa) è più difficile da realizzare, perché è necessario avere una comunicazione push dal server a client, e soprattutto perché è necessario rendere asincrone le richieste provenienti dal client. Fortunatamente con Java EE 7 adesso abbiamo pieno controllo di entrambi questi aspetti, grazie alla WebSocket API (JSR 386) e le Concurrency Utilities per Java EE (JSR 236).
Java EE e WebSocket: reprise
Come promesso, , questa volta in chiave RxJava. Grazie a questo framework infatti, possiamo semplificare notevolmente il codice del’endpoint WebSocket perché, nonostante le chiamate dal client siano asincrone, si riesce a recuperare la sessione univoca a cui rispondere! Ma andiamo per ordine: per rendere le chiamate asincrone, per prima cosa dobbiamo conoscere le EE Concurrency Utilities.
Java EE 7 Concurrency Utilities in pillole
Quando uscì Java SE 5, tra la miriade di novità fu introdotta una nuova gestione più efficiente dei thread all’interno della specifica chiamata “Concurrency Utilities” (JSR 166): grazie ad oggetti come ExecutorService
e ScheduledExecutorService
era finalmente possibile gestire code di task, anche schedulati, basati sul sistema dei thread pool, in modo piuttosto semplice e trasparente allo sviluppatore. Tutte queste belle cose però sono altamente sconsigliabili all’interno di un Application Server, perché questi hanno già a sua volta una serie di thread pool che gestiscono varie funzionalità del sistema e usare thread “sconosciuti” all’Application Server porta a situazioni inattese (thread uccisi senza preavviso, oppure che vengono abbassati di priorità da non venire mai terminati…)
Tutto questo è acqua passata con la versione 7 della specifica Enterprise, grazie all’introduzione delle Concurrency Utilities per Java EE. Da notare la sovrapposizione non casuale del nome con quella di Java 5: sono infatti strettamente correlate, come mostra il diagramma delle classi principali:
In pratica, il package javax.enterprise.concurrent contiene la versione “managed” delle corrispettive classi del package java.util.concurrent, nel senso che sono gestite dall’Application Server. Quindi non useremo la classe statica Executors
per accedere ad un’istanza di questi servizi, ma saranno risorse JNDI iniettabili dal server:
class MyClass { /** * Default JNDI name: * java:comp/DefaultManagedExecutorService */ @Resource private ManagedExecutorService defaultExecutorService /** * Default JNDI name: * java:comp/DefaultManagedScheduledExecutorService */ @Resource private ManagedScheduledExecutorService defaultScheduledService }
e sempre pronte all’uso, proprio come siamo abituati a fare da Java SE 5. L’unica differenza è che, essendo gestite dall’Application Server, non è consentito chiamare i metodi che ne regolano il ciclo di vita (come per esempio
shutdown
).L’uso di queste classi garantisce la propagazione del contesto di provenienza della chiamata che include ClassLoader, JNDI e Security Context (proprio come fanno gli EJB Asincroni). Questo concetto è fondamentale da tenere a mente altrimenti non ci spiegheremo a breve come faccia RxJava a recuperare la WebSocket giusta a cui rispondere, nonostante le operazioni vengano eseguite in un nuovo thread.
Queste sono le informazioni base per poter usare le nuove Concurrency Utilities. La specifica definisce anche altri due servizi (uno dei quali nuovo):
-
ContextService: utile quando si ha del codice scritto con la Concurrency API di Java SE e la si vuol portare su EE. Permette infatti di creare dei proxy contestuali, sui task che dobbiamo eseguire, ovvero associa al task (Runnable o Callable che sia) il contesto del chiamante, semplicemente così:
Runnable myTask = ...; Runnable contextualizedTask = contextService.createContextualProxy(myTask, Runnable.class);
-
ManagedThreadFactory: permette di creare thread manualmente, ma comunque a conoscenza del contesto di provenienza e gestiti dall’Application Server. Gli oggetti
Thread
prodotti infatti implementano l’interfacciaManageableThread
. Questa factory può essere usata anche per creare degliExecutorService
in stile Java SE:
@Resource private ManagedThreadFactory factory; public ExecutorService getThreadPool(int poolSize) { return Executors.newFixedThreadPool(poolSize, factory); }
Maggiori approfondimenti possono essere trovati, per esempio per WildFly, sul sito di JBoss.
RxJava e WebSocket
Adesso che abbiamo le basi, possiamo riprendere in mano il progetto di esempio dei , il cui sorgente è disponibile su GitHub.
A livello funzionale non cambia niente rispetto al caso precedente. Riportiamo il sequence diagram:
Questa volta quello che cambia è semplicemente la modalità di gestione dell'”asincronicità” dell’operazione time consuming e della “notifica” del suo termine: al posto di EJB Asincroni ed eventi CDI abbiamo un ExecutorService e RxJava.
Vediamo come cambia il metodo onMessage
della WebSocket (la versione completa è disponibile su ).
@Inject private ObservableExecutorAdapter executorAdapter; ... @OnMessage public Message onMessage(Message msg, Session session) throws InterruptedException { msg.setSessionId(session.getId()); executorAdapter.executeAsync(() -> { try { return service.echoSync(msg); } catch (InterruptedException e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); throw new RuntimeException(e); } }).subscribe(notifyEcho(session), notifyError(session)); String response = logWithDetails("Message '" + msg.getMessage() + "' received"); LOGGER.info(response); return new Message(response, session.getId()); }
In questo caso, la chiamata al servizio time consuming è definita in modo sincrono (riga 12), ma è resa asincrona grazie all’adapter (riga 10) che utilizza le Concurrency Utilities per eseguire la lambda expression che viene passata (come ci insegna ). Questo adapter restituisce un
Observable
di RxJava, su cui si può eseguire un subscribe
(riga 17) degli elementi emessi.
Ma andiamo per ordine: mentre il servizio time consuming può essere adesso un normale POJO, l’adapter è un managed bean nel quale viene iniettato un ManagedExecutorService
per eseguire la lambda expression di tipo Supplier
in un altro thread:
@ApplicationScoped public class ObservableExecutorAdapter { @Resource private ManagedExecutorService executorService; publicObservable executeAsync(Supplier function) { return (Observable ) Observable.create(subscriber -> { try { subscriber.onNext(function.get()); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } }).subscribeOn(Schedulers.from(executorService)); } }
Abbiamo il significato di onNext
, onCompleted
e onError
, concentriamoci quindi sulle righe evidenziate. L’oggetto Observable
viene creato tramite un subscriber
(riga 9), dove emette un solo valore prodotto dalla chiamata all funzione function.get
(riga 11). Il processo è eseguito su un nuovo thread grazie alla classe Schedulers
che crea il thread dall’executorService
passato come parametro.
Con questa semplice classe quindi si abilita la “magia” di rendere asincrona l’esecuzione della funzione function
e di gestire in modo semplice il caso di successo dal caso di errore direttamente nell’endpoint della WebSocket, dove abbiamo il riferimento alla sessione del chiamante. Il metodo subscribe
chiamato sull’Observable
delega infatti la gestione di questi casi a due metodi notifyEcho
e notifyError
. Vediamo il primo:
private Action1notifyEcho(Session session) { return echo -> { if (session.isOpen()) { try { session.getBasicRemote().sendObject(echo); } catch (IOException | EncodeException e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); throw new RuntimeException(e); } } else { LOGGER.severe("Session " + session.getId() + " already closed!"); } }; }
Avendo a disposizione l’oggetto
Session
ottenuto alla ricezione del messaggio, è possibile rispondere direttamente al client associato senza particolari gestioni delle sessioni aperte.
Conclusioni
Confrontando la soluzione con CDI e EJB Asincroni vista nei due post precendeti, con RxJava il codice risulta di gran lunga più compatto e più chiaro: il legame che rimane tra il subscribe
dell’Observable
e la sessione WebSocket del client rende superfluo tutto il meccanismo messo in piedi nell’altra soluzione per salvarsi tutte le sessioni aperte e portarsi dietro l’id della sessione a cui rispondere.
Il cuore della soluzione è la class ObservableExecutorAdapter
che permette di rendere asincrona qualsiasi funzione restituendo un Observable
sul tipo di ritorno, a cui poi poter “sottoscrivere” una funzione per osservare i risultati e rimandarli al client.
Su OpenShift è presente un progetto di demo che mostra a video lo scambio di messaggi con il server sia con la soluzione CDI + EJB Asincroni che con RxJava: dal punto di vista utente l’effetto è il medesimo, ma dietro le quinte abbiamo visto che con RxJava si ottiene una soluzione architetturalmente più semplice ed efficace. Il sorgente dell’applicativo è disponibile su GitHub.