RxJavaEE: RxJava, WebSocket e Concurrency in Java EE 7

Si parla tanto di RxJava ultimamente (anche noi su CNJ), e a ragion veduta direi. Nell’ambito Java, ha spopolato soprattuto nel mondo Android perché permette di semplificare notevolmente la gestione degli eventi asincroni e scrivere un codice decisamente più moderno. Mi sono sempre chiesto se RxJava fosse altrettanto utile negli applicativi web, dove la “reattività” tra client e server (o viceversa) è più difficile da realizzare, perché è necessario avere una comunicazione push dal server a client, e soprattutto perché è necessario rendere asincrone le richieste provenienti dal client. Fortunatamente con Java EE 7 adesso abbiamo pieno controllo di entrambi questi aspetti, grazie alla WebSocket API (JSR 386) e le Concurrency Utilities per Java EE (JSR 236).

Java EE e WebSocket: reprise

Come promesso, riprendiamo il discorso sulle WebSocket, questa volta in chiave RxJava. Grazie a questo framework infatti, possiamo semplificare notevolmente il codice del’endpoint WebSocket perché, nonostante le chiamate dal client siano asincrone, si riesce a recuperare la sessione univoca a cui rispondere! Ma andiamo per ordine: per rendere le chiamate asincrone, per prima cosa dobbiamo conoscere le EE Concurrency Utilities.

Java EE 7 Concurrency Utilities in pillole

Quando uscì Java SE 5, tra la miriade di novità fu introdotta una nuova gestione più efficiente dei thread all’interno della specifica chiamata “Concurrency Utilities” (JSR 166): grazie ad oggetti come ExecutorService e ScheduledExecutorService era finalmente possibile gestire code di task, anche schedulati, basati sul sistema dei thread pool, in modo piuttosto semplice e trasparente allo sviluppatore. Tutte queste belle cose però sono altamente sconsigliabili all’interno di un Application Server, perché questi hanno già a sua volta una serie di thread pool che gestiscono varie funzionalità del sistema e usare thread “sconosciuti” all’Application Server porta a situazioni inattese (thread uccisi senza preavviso, oppure che vengono abbassati di priorità da non venire mai terminati…)

Tutto questo è acqua passata con la versione 7 della specifica Enterprise, grazie all’introduzione delle Concurrency Utilities per Java EE. Da notare la sovrapposizione non casuale del nome con quella di Java 5: sono infatti strettamente correlate, come mostra il diagramma delle classi principali:

Executors Class Diagram

In pratica, il package javax.enterprise.concurrent contiene la versione “managed” delle corrispettive classi del package java.util.concurrent, nel senso che sono gestite dall’Application Server. Quindi non useremo la classe statica Executors per accedere ad un’istanza di questi servizi, ma saranno risorse JNDI iniettabili dal server:

class MyClass {

   /**
    * Default JNDI name: 
    * java:comp/DefaultManagedExecutorService
    */
   @Resource
   private ManagedExecutorService defaultExecutorService 

   /**
    * Default JNDI name: 
    * java:comp/DefaultManagedScheduledExecutorService
    */
   @Resource
   private ManagedScheduledExecutorService defaultScheduledService 
}

e sempre pronte all’uso, proprio come siamo abituati a fare da Java SE 5. L’unica differenza è che, essendo gestite dall’Application Server, non è consentito chiamare i metodi che ne regolano il ciclo di vita (come per esempio shutdown).
L’uso di queste classi garantisce la propagazione del contesto di provenienza della chiamata che include ClassLoader, JNDI e Security Context (proprio come fanno gli EJB Asincroni). Questo concetto è fondamentale da tenere a mente altrimenti non ci spiegheremo a breve come faccia RxJava a recuperare la WebSocket giusta a cui rispondere, nonostante le operazioni vengano eseguite in un nuovo thread.

Queste sono le informazioni base per poter usare le nuove Concurrency Utilities. La specifica definisce anche altri due servizi (uno dei quali nuovo):

  • ContextService: utile quando si ha del codice scritto con la Concurrency API di Java SE e la si vuol portare su EE. Permette infatti di creare dei proxy contestuali, sui task che dobbiamo eseguire, ovvero associa al task (Runnable o Callable che sia) il contesto del chiamante, semplicemente così:
    Runnable myTask = ...;
    Runnable contextualizedTask = contextService.createContextualProxy(myTask, Runnable.class);
    
  • ManagedThreadFactory: permette di creare thread manualmente, ma comunque a conoscenza del contesto di provenienza e gestiti dall’Application Server. Gli oggetti Thread prodotti infatti implementano l’interfaccia ManageableThread. Questa factory può essere usata anche per creare degli ExecutorService in stile Java SE:
    
    @Resource
    private ManagedThreadFactory factory;
    
    public ExecutorService getThreadPool(int poolSize) {
       return Executors.newFixedThreadPool(poolSize, factory);
    }
    
    

Maggiori approfondimenti possono essere trovati, per esempio per WildFly, sul sito di JBoss.

RxJava e WebSocket

Adesso che abbiamo le basi, possiamo riprendere in mano il progetto di esempio dei post precedenti, il cui sorgente è disponibile su GitHub.

A livello funzionale non cambia niente rispetto al caso precedente. Riportiamo il sequence diagram:

Time consuming operation with websocket

Questa volta quello che cambia è semplicemente la modalità di gestione dell'”asincronicità” dell’operazione time consuming e della “notifica” del suo termine: al posto di EJB Asincroni ed eventi CDI abbiamo un ExecutorService e RxJava.

Vediamo come cambia il metodo onMessage della WebSocket (la versione completa è disponibile su GitHub).

    @Inject
    private ObservableExecutorAdapter executorAdapter;

    ...

    @OnMessage
    public Message onMessage(Message msg, Session session) throws InterruptedException {
        msg.setSessionId(session.getId());

        executorAdapter.executeAsync(() -> {
            try {
                return service.echoSync(msg);
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, e.getMessage(), e);
                throw new RuntimeException(e);
            }
        }).subscribe(notifyEcho(session), notifyError(session));

        String response = logWithDetails("Message '" + msg.getMessage() + "' received");
        LOGGER.info(response);
        return new Message(response, session.getId());
    }

In questo caso, la chiamata al servizio time consuming è definita in modo sincrono (riga 12), ma è resa asincrona grazie all’adapter (riga 10) che utilizza le Concurrency Utilities per eseguire la lambda expression che viene passata (come ci insegna Giampaolo in 7 minuti). Questo adapter restituisce un Observable di RxJava, su cui si può eseguire un subscribe (riga 17) degli elementi emessi.

Ma andiamo per ordine: mentre il servizio time consuming può essere adesso un normale POJO, l’adapter è un managed bean nel quale viene iniettato un ManagedExecutorService per eseguire la lambda expression di tipo Supplier in un altro thread:

@ApplicationScoped
public class ObservableExecutorAdapter {

    @Resource
    private ManagedExecutorService executorService;

    public <O> Observable<O> executeAsync(Supplier<O> function) {

        return (Observable<O>) Observable.create(subscriber -> {
            try {
                subscriber.onNext(function.get());
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }).subscribeOn(Schedulers.from(executorService));
    }
}

Abbiamo già affrontato il significato di onNext, onCompleted e onError, concentriamoci quindi sulle righe evidenziate. L’oggetto Observable viene creato tramite un subscriber (riga 9), dove emette un solo valore prodotto dalla chiamata all funzione function.get (riga 11). Il processo è eseguito su un nuovo thread grazie alla classe Schedulers che crea il thread dall’executorService passato come parametro.

Con questa semplice classe quindi si abilita la “magia” di rendere asincrona l’esecuzione della funzione function e di gestire in modo semplice il caso di successo dal caso di errore direttamente nell’endpoint della WebSocket, dove abbiamo il riferimento alla sessione del chiamante. Il metodo subscribe chiamato sull’Observable delega infatti la gestione di questi casi a due metodi notifyEcho e notifyError. Vediamo il primo:

private Action1<Message> notifyEcho(Session session) {
        return echo -> {
            if (session.isOpen()) {
                try {
                    session.getBasicRemote().sendObject(echo);
                } catch (IOException | EncodeException e) {
                    LOGGER.log(Level.SEVERE, e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            } else {
                LOGGER.severe("Session " + session.getId() + " already closed!");
            }
        };
    }

Avendo a disposizione l’oggetto Session ottenuto alla ricezione del messaggio, è possibile rispondere direttamente al client associato senza particolari gestioni delle sessioni aperte.

Conclusioni

Confrontando la soluzione con CDI e EJB Asincroni vista nei due post precendeti, con RxJava il codice risulta di gran lunga più compatto e più chiaro: il legame che rimane tra il subscribe dell’Observable e la sessione WebSocket del client rende superfluo tutto il meccanismo messo in piedi nell’altra soluzione per salvarsi tutte le sessioni aperte e portarsi dietro l’id della sessione a cui rispondere.

Il cuore della soluzione è la class ObservableExecutorAdapter che permette di rendere asincrona qualsiasi funzione restituendo un Observable sul tipo di ritorno, a cui poi poter “sottoscrivere” una funzione per osservare i risultati e rimandarli al client.

Su OpenShift è presente un progetto di demo che mostra a video lo scambio di messaggi con il server sia con la soluzione CDI + EJB Asincroni che con RxJava: dal punto di vista utente l’effetto è il medesimo, ma dietro le quinte abbiamo visto che con RxJava si ottiene una soluzione architetturalmente più semplice ed efficace. Il sorgente dell’applicativo è disponibile su GitHub.

Andrea Como

Sono un software engineer focalizzato nella progettazione e sviluppo di applicazioni web in Java. Presso OmniaGroup ricopro il ruolo di Tech Leader sulle tecnologie legate alla piattaforma Java EE 5 (come WebSphere 7.0, EJB3, JPA 1 (EclipseLink), JSF 1.2 (Mojarra) e RichFaces 3) e Java EE 6 con JBoss AS 7, in particolare di CDI, JAX-RS, nonché di EJB 3.1, JPA2, JSF2 e RichFaces 4. Al momento mi occupo di ECM, in particolar modo sulla customizzazione di Alfresco 4 e sulla sua installazione con tecnologie da devops come Vagrant e Chef. In passato ho lavorato con la piattaforma alternativa alla enterprise per lo sviluppo web: Java SE 6, Tomcat 6, Hibernate 3 e Spring 2.5. Nei ritagli di tempo sviluppo siti web in PHP e ASP. Per maggiori informazioni consulta il mio curriculum pubblico. Follow me on Twitter - LinkedIn profile - Google+