EJB3 Message Driven Bean con JMS: Request/Reply Pattern con JBoss – Parte I

In un abbiamo introdotto un po’ di teoria che sta dietro ai Message Driven Bean (MDB): è importante conoscere l’architettura su cui si basano questi strumenti perché è molto diversa dai componenti sincroni con cui siamo soliti lavorare (leggi Session Beans). La teoria è importante perché ci permette di capire cosa accade dietro le quinte (e soprattutto perché si scrive un certo codice), ma se non vediamo come si mette in pratica serve a ben poco!

Le implementazioni del Request/Reply Pattern

di questo pattern molto interessante perché ci permette di stabilire una comunicazione bidirezionale asincrona con il server. Vale la pena approfondire come andrebbe implementato. In rete ci sono infatti innumerevoli esempi che mostrano come inviare un messaggio ad un MDB in ascolto su una coda. Già più difficile è trovare un esempio completo (e soprattutto funzionante) del Request/Reply Pattern, ovvero come ricevere un messaggio di risposta da un MDB. Ognuno propone la sua soluzione senza spiegare a quale sotto-pattern sta facendo riferimento e infatti nei forum si trovano le cose più strane e confuse sull’uso del message id e del correlation id. Chi ha letto il post precedente non ha però più dubbi a riguardo!!

Per i più pigri riportiamo la tabella riassuntiva mostrata nell’altro post:

JMS Pattern Response Queue CorrelationID
Correlation ID Pattern Tutte le risposte vanno sulla stessa coda prefissata Il server copia il Correlation ID della richiesta nel Correlation ID della risposta
Message ID Pattern La risposta viene indirizzata dinamicamente alla coda specificata nel messaggio del mittente Il server copia il Message ID della richiesta nel Correlation ID della risposta

Vediamo quindi come implementare questi sotto-pattern con JBoss AS 6. La prima cosa da fare, a differenza delle versioni precedenti, è configurare le code sul server. Come menzionato sul wiki di JBoss, dalla versione 6 il JMS provider predefinito è HornetQ. E’ sufficiente creare un file *-hornetq-jms.xml e posizionarlo a livello di server o di applicazione:

  • a livello di server: nella cartella %JBOSS_HOME%/server//deploy;
  • a livello di applicazione: nella root dell’EAR o nella cartella META-INF.

Per il nostro esempio, abbiamo deciso di seguire la seconda strada in modo da avere un EAR “autosufficiente” al momento del deploy. Creiamo quindi un file chiamato queues-hornetq-jms.xml nella cartella META-INF del nostro EAR:



    
      
    
   
   
      
   


dove dichiariamo 2 code: una per le richieste e una per le risposte. Metteremo quindi in ascolto sulla coda delle richieste un MDB e interrogheremo periodicamente la coda delle risposte per sapere se ci sono messaggi correlati con quello inviato. La classe che ci permette di mandare e ricevere messaggi avrà una interfaccia di questo tipo:

@Local
public interface CorrelatedMessageHandlerEJBLocal {

     void sendMessage(String text);
        
     String receiveMessage() throws JMSException;
        
     void dispose();
}

Corretation ID Pattern

Abbiamo detto che per lavorare con questo pattern occorre che:

  • tutte le risposte vadano su una stessa coda preconcordata con il mittente;
  • il mittente generi e tenga memoria del Correlation ID. Il messaggio di risposta deve avere lo stesso Correlation ID di quello inviato.
Il mittente

Dal momento che è necessario generare e tenere memoria del correlation id per recuperare i messaggi di risposta, abbiamo deciso di affidare la logica della gestione delle richieste e risposte ad un Session Bean Stateful come segue:

@Stateful
public class CorrelatedMessageHandlerEJB 
              implements CorrelatedMessageHandlerEJBLocal {

     private String correlationId;
     private static final Logger logger = 
              Logger.getLogger(CorrelatedMessageHandlerEJB.class);
     
     @Resource(mappedName = "ConnectionFactory")
     private QueueConnectionFactory queueFactory;

     @Resource(mappedName = "jms/RequestQueue")
     private Queue requestQueue;
     
     @Resource(mappedName = "jms/ResponseQueue")
     private Queue replyQueue;
     
     private QueueConnection connection;
     
     @PostConstruct
     @PostActivate
     public void init() {
          logger.info("Init EJB Statefull");
          try {
               connection = queueFactory.createQueueConnection();
          } catch (JMSException e) {
               e.printStackTrace();
          }
     }
     
     @Override
     public void sendMessage(String text) {
          QueueSession session = null;
          QueueSender sender = null;
          try {
               session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
               TextMessage message = session.createTextMessage();
               message.setText(text);
               correlationId = new Date().getTime().toString();
               message.setJMSCorrelationID(correlationId);
               sender = session.createSender(requestQueue);
               sender.send(message);
          } catch (JMSException e) {
               e.printStackTrace();
          } finally {
               try {
                    if (sender != null) sender.close();
                    if (session != null) session.close();
               } catch (JMSException e) {
                    e.printStackTrace();
               }
          }
     }

     @Override
     public String receiveMessage() throws JMSException {
          QueueSession session = null;
          QueueReceiver receiver = null;
          TextMessage message = null;
          try {
               connection.start();
               session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
               String selector = "JMSCorrelationID='" + correlationId + "'";
               receiver = session.createReceiver(replyQueue, selector);
               logger.info("Trying to receive message with selector " + selector);
               message = (TextMessage) receiver.receive(1 * 1000);
               
               if (message != null) {
                    logger.info("Message received! " + message.getText());
                    return message.getText();
               }
               else { 
                    logger.info("No message received from queue");
                    return null;
               }
          } catch (JMSException e) {
               e.printStackTrace();
               return null;
          } finally {
               try {
                    if (receiver != null) receiver.close();
                    if (session != null) session.close();
               } catch (JMSException e) {
                    e.printStackTrace();
               }
          }
     }
     
     @PreDestroy
     @PrePassivate
     public void preDispose() {
          try {
               if (connection != null) connection.close();
          } catch (JMSException e) {
               e.printStackTrace();
          }
     }
     
     @Remove
     @Override
     public void dispose() {
          logger.info("Chiusura EJB statefull");
     }
}

Cerchiamo di capire cosa fa questo EJB Stateful:

  • Per poter inviare e ricevere messaggi la classe gestisce:
    • una connection factory QueueConnectionFactory la cui istanza è iniettata dal server tramite l’annotazione @Resource. In JBoss, il nome JNDI di default della factory in questione è ConnectionFactory;
    • due code Queue, una per la richieste, una per le risposte, iniettate anch’esse dal server. I nomi sono quelli definiti nel nostro file queues-hornetq-jms.xml;
    • una connessione QueueConnection al broker JMS creata dalla factory al momento dell’inizializzazione dell’EJB e chiusa prima della sua distruzione (vedi annotazioni @PostConstruct, @PostActivate, @PreDestroy e @PrePassivate). Se si hanno problemi di timeout della connessione conviene gestirla in modo diverso, anche se aprire troppe connessioni verso il broker ha un suo costo.
  • Il metodo sendMessage(String text) invia il testo ricevuto in ingresso come TextMessage:
    • viene creata una sessione di comunicazione dalla connessione come non transactional e con auto ack. Cosa significa?
      • transactional: in JMS una transazione è un gruppo di messaggi che sono consegnati o ricevuti o tutti insieme o nessuno;
      • ack mode: modo con cui il broker JMS riceve i messaggi. I valori possibili sono:
        • CLIENT_ACKNOWLEDGE: il JMS server non può inviare messaggi in sequenza finché non ha ricevuto un segnale di accettazione (ack, accettazione appunto) tra un messaggio e l’altro.
        • AUTO_ACKNOWLEDGE: il messaggio viene consegnato una ed una sola volta. Questa policy genera un certo overhead sul server ma è la più usata.
        • DUPS_ACKNOWLEDGE: il messaggio può essere inviato più volte, riducendo l’overhead sul server ma aumentando quello di rete.
    • viene creato un TextMessage che conterrà il testo ricevuto in ingresso.
    • viene generato il correlation id e impostato nel messaggio. Per non creare confusione, è necessario che questo valore sia univoco sulla coda. Nel nostro caso abbiamo usato il timestamp in millisecondi (anche se si potrebbe fare di meglio!).
    • dalla sessione viene creato un QueueSender sulla coda delle richieste e viene inviato il messagio.
  • il metodo receiveMessage() viene chiamato per controllare che ci siano messaggi sulla coda di risposta.
    • per prima cosa viene avviata la connessione connection.start() con il broker JMS. Questo passo è fondamentale altrimenti non riceveremo nessun messaggio nonostante il codice sia corretto;
    • si apre poi una sessione e si costruisce un selector basato sul correlation id del messaggio inviato precedentemente;
    • si costruisce un receiver che filtra i messaggi caratterizzati dal nostro selector e infine sta in ascolto per 1 secondo (receiver.receive(1 * 1000)).
  • per ricevere messaggi possiamo creare un meccanismo di polling che invoca perioridamente questo metodo, oppure possiamo usare una più moderna Web Socket per una notifica push dal server al client non appena il messaggio è stato ricevuto. Se scegliamo questa strada, dovremo chiamare receiver.receive() senza argomenti: finché il messaggio non verrà ricevuto, il metodo receiveMessage() non avrà termine. Vedremo in un’altra occasione come implementare questa soluzione.
Il destinatario

Fin’ora abbiamo visto come si fa a inviare e ricevere un messaggio di risposta. Ma chi riceve il messaggio e invia la risposta? Stiamo parlando di un listener su una certa coda, ovvero di un Message Driven Bean. Vediamone il codice:

@MessageDriven(
   activationConfig = { 
   @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
   @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/RequestQueue")
 })
public class CorrelatedMessageHandlerMDB implements MessageListener {

     private static final Logger logger = Logger.getLogger(CorrelatedMessageHandlerMDB.class);

     @Resource(mappedName = "ConnectionFactory")
     private QueueConnectionFactory queueFactory;
     
     @Resource(mappedName = "jms/ResponseQueue")
     private Queue replyQueue;
     
     private QueueConnection connection;
     
    @PostConstruct
    public void init() {
         logger.info("Init request MDB listener...");
          try {
               connection = queueFactory.createQueueConnection();
          } catch (JMSException e) {
               e.printStackTrace();
          }
    }
    
     /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
         QueueSession session = null;
         QueueSender sender = null;
          try {
               logger.info(((TextMessage) message).getText());
               
               session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
               TextMessage responseMessage = session.createTextMessage();
               responseMessage.setText("Echo: " + ((TextMessage) message).getText());
               responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
               
               sender = session.createSender(replyQueue);
               sender.send(responseMessage);
          } catch (JMSException e) {
               e.printStackTrace();
          } finally {
               try {
                    if (sender != null) sender.close();
                    if (session != null) session.close();
               } catch (JMSException e) {
                    e.printStackTrace();
               }
          }
    }
    
     @PreDestroy
     public void preDispose() {
          logger.info("Closing MDB listener...");
          try {
               if (connection != null) connection.close();
          } catch (JMSException e) {
               e.printStackTrace();
          }
     }
}
  • ogni MDB che utilizza JMS come sistema di messaggi deve implementare l’interfaccia javax.jms.MessageListener;
  • tra i suoi parametri di attivazione (@ActivationConfigProperty) deve essere indicata la destinazione e il tipo, cioè dove ascoltare. In questo caso abbiamo configurato il nostro MDB per essere un listener su una coda JMS javax.jms.Queue di nome jms/RequestQueue;
  • tramite injection sull’attributo replyQueue, l’MDB sa che deve sempre rispondere sulla coda jms/ResponseQueue;
  • la connessione al broker JMS viene aperta e chiusa dopo la creazione dell’MDB (@PostConstruct) e prima della sua distruzione (@PreDestroy). Se questo dovesse creare problemi di timeout della connessione, è opportuno inserire apertura e chiusura direttamente nel metodo onMessage(...);
  • quando viene ricevuto un messaggio, viene invocato il metodo onMessage(...). Per rispondere al messaggio vengono eseguite le stesse operazioni viste precedentemente: si crea una sessione e un sender sulla coda di risposta. Da notare che in questo caso il correlation id del messaggio ricevuto viene copiato in quello che verrà inviato in risposta, come richiede il pattern.

Conclusioni

In un post precedente avevamo illustrato un po’ di teoria che sta dietro al mondo JMS. In questo post invece abbiamo visto nel concreto come si fa ad applicare il pattern Correlation ID in JBoss applicandolo alla lettera. In questo caso è possibile modificare il pattern variando la coda di risposta: invece di avere una coda predefinita, è possibile inserire nel messaggio di invio il riferimento alla coda su cui rispondere, come richiesto per esempio dal pattern Message ID che vedremo .

Andrea Como

Sono un software engineer focalizzato nella progettazione e sviluppo di applicazioni web in Java. Presso OmniaGroup ricopro il ruolo di Tech Leader sulle tecnologie legate alla piattaforma Java EE 5 (come WebSphere 7.0, EJB3, JPA 1 (EclipseLink), JSF 1.2 (Mojarra) e RichFaces 3) e Java EE 6 con JBoss AS 7, in particolare di CDI, JAX-RS, nonché di EJB 3.1, JPA2, JSF2 e RichFaces 4. Al momento mi occupo di ECM, in particolar modo sulla customizzazione di Alfresco 4 e sulla sua installazione con tecnologie da devops come Vagrant e Chef. In passato ho lavorato con la piattaforma alternativa alla enterprise per lo sviluppo web: Java SE 6, Tomcat 6, Hibernate 3 e Spring 2.5. Nei ritagli di tempo sviluppo siti web in PHP e ASP. Per maggiori informazioni consulta il mio . - -

  • Dilietol

    Mi sembra che ci sia un errore nell’esempio del destinatario. Nel metodo onMessage viene inviato sulla coda delle risposte il messaggio in input e non quello di risposta.

    • Grazie per la segnalazione e soprattutto per l’attenzione!! In questo caso il mittente riceve esattamente lo stesso messaggio senza l'”Echo”… Lo correggo subito!

  • Paolo

     Buonissima rappresentazione e articolo ben fatto.

    Tuttavia, in una logica distribuita dove il web incalza sempre più, per implementare una “conversazione asincrona bidirezionale” vedo i web services (in tal caso la tecnologia JAX-WS asynchronous) una ottima alternativa, perchè basata su HTTP.
    Il sistema permette lato client di inviare una richiesta asincrona in modo che il programma non si blocchi e la possibilità di ottenere la risposta invocandola esplicitamente (response.get() – polling) o implementando un apposito Response Handler (callback). Anche se il sistema si appoggia a protocolli di alto livello, le prestazioni sono ottimali.
    In questo caso non è più necessario conservare lo stato creando client statefull in quanto il il web service crea una correlazione implicita tra client e server.

    Sarebbe interessante creare un nuovo articolo su tale argomento.

    Ciao

    • Ciao, ti ringrazio per la segnalazione! Ho avuto modo di sperimentare le callback asincrone con .NET su applicazioni client per Windows, ma mai in Java. Purtroppo con le interfacce web è difficile liberarsi del polling, almeno finché le websocket non saranno largamente supportate. JAX-WS asincrono mi sembra proprio un bell’argomento da approfondire… grazie e continua a seguirci!

  • Fabio

    Articolo ben fatto soprattutto per le spiegazioni architetturali che inserisci a corredo! Mi sto avvicinando ora agli EJB ho programmato sempre con pattern MVC sfruttando Struts, Hibernate e quartz come framework e senza l’uso delle annotations e tantomeno dell’IoC e soprattutto su AS WebSphere.. Quindi seguendo tutto l’esempio mi manca come poter eseguire le chiamate a questi ogetti.. ho bisogno di creare un client quindi da altra JVM ? oppure devo creare una JSP per invocare gli oggetti? Cosa mi consigli per testare il tutto? Grazie mille e ti annuncio che sei entrato a far parte dei bookmark più preziosi!

    • Ciao, se passi dal WAS a JBoss (magari l’ultima versione, Wildfly) vedrai come la vita di programmatore EE migliora sensibilmente 😉 Apparte gli scherzi, per fare semplici prove puoi creare una servlet (o meglio un servizio REST con JAX-RS) in cui iniettare il CorrelatedMessageHandlerEJB e chiamare il metodo sendMessage. Eventualmente la jsp potrebbe servirti per chiamare il servizio REST, ma a questo punto è meglio un test di integrazione con JUnit (ne avevamo parlato qua: ). Spero di averti aiutato e… buone prove!

      • Grazie per le info! Solo un appunto, il link che hai postato sulla risposta credo sia errato.. errore 404 😉

        • Ops! È rimasta la parentesi di chiusura della frase attaccata all’indirizzo, basta togliere quella 🙂