In un abbiamo introdotto un po’ di teoria che sta dietro ai Message Driven Bean (MDB): è importante conoscere l’architettura su cui si basano questi strumenti perché è molto diversa dai componenti sincroni con cui siamo soliti lavorare (leggi Session Beans). La teoria è importante perché ci permette di capire cosa accade dietro le quinte (e soprattutto perché si scrive un certo codice), ma se non vediamo come si mette in pratica serve a ben poco!
Le implementazioni del Request/Reply Pattern
di questo pattern molto interessante perché ci permette di stabilire una comunicazione bidirezionale asincrona con il server. Vale la pena approfondire come andrebbe implementato. In rete ci sono infatti innumerevoli esempi che mostrano come inviare un messaggio ad un MDB in ascolto su una coda. Già più difficile è trovare un esempio completo (e soprattutto funzionante) del Request/Reply Pattern, ovvero come ricevere un messaggio di risposta da un MDB. Ognuno propone la sua soluzione senza spiegare a quale sotto-pattern sta facendo riferimento e infatti nei forum si trovano le cose più strane e confuse sull’uso del message id e del correlation id. Chi ha letto il post precedente non ha però più dubbi a riguardo!!
Per i più pigri riportiamo la tabella riassuntiva mostrata nell’altro post:
JMS Pattern | Response Queue | CorrelationID |
---|---|---|
Correlation ID Pattern | Tutte le risposte vanno sulla stessa coda prefissata | Il server copia il Correlation ID della richiesta nel Correlation ID della risposta |
Message ID Pattern | La risposta viene indirizzata dinamicamente alla coda specificata nel messaggio del mittente | Il server copia il Message ID della richiesta nel Correlation ID della risposta |
Vediamo quindi come implementare questi sotto-pattern con JBoss AS 6. La prima cosa da fare, a differenza delle versioni precedenti, è configurare le code sul server. Come menzionato sul wiki di JBoss, dalla versione 6 il JMS provider predefinito è HornetQ. E’ sufficiente creare un file *-hornetq-jms.xml e posizionarlo a livello di server o di applicazione:
- a livello di server: nella cartella %JBOSS_HOME%/server/
/deploy ; - a livello di applicazione: nella root dell’EAR o nella cartella META-INF.
Per il nostro esempio, abbiamo deciso di seguire la seconda strada in modo da avere un EAR “autosufficiente” al momento del deploy. Creiamo quindi un file chiamato queues-hornetq-jms.xml nella cartella META-INF del nostro EAR:
dove dichiariamo 2 code: una per le richieste e una per le risposte. Metteremo quindi in ascolto sulla coda delle richieste un MDB e interrogheremo periodicamente la coda delle risposte per sapere se ci sono messaggi correlati con quello inviato. La classe che ci permette di mandare e ricevere messaggi avrà una interfaccia di questo tipo:
@Local
public interface CorrelatedMessageHandlerEJBLocal {
void sendMessage(String text);
String receiveMessage() throws JMSException;
void dispose();
}
Corretation ID Pattern
Abbiamo detto che per lavorare con questo pattern occorre che:
- tutte le risposte vadano su una stessa coda preconcordata con il mittente;
- il mittente generi e tenga memoria del Correlation ID. Il messaggio di risposta deve avere lo stesso Correlation ID di quello inviato.
Il mittente
Dal momento che è necessario generare e tenere memoria del correlation id per recuperare i messaggi di risposta, abbiamo deciso di affidare la logica della gestione delle richieste e risposte ad un Session Bean Stateful come segue:
@Stateful
public class CorrelatedMessageHandlerEJB
implements CorrelatedMessageHandlerEJBLocal {
private String correlationId;
private static final Logger logger =
Logger.getLogger(CorrelatedMessageHandlerEJB.class);
@Resource(mappedName = "ConnectionFactory")
private QueueConnectionFactory queueFactory;
@Resource(mappedName = "jms/RequestQueue")
private Queue requestQueue;
@Resource(mappedName = "jms/ResponseQueue")
private Queue replyQueue;
private QueueConnection connection;
@PostConstruct
@PostActivate
public void init() {
logger.info("Init EJB Statefull");
try {
connection = queueFactory.createQueueConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void sendMessage(String text) {
QueueSession session = null;
QueueSender sender = null;
try {
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
TextMessage message = session.createTextMessage();
message.setText(text);
correlationId = new Date().getTime().toString();
message.setJMSCorrelationID(correlationId);
sender = session.createSender(requestQueue);
sender.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (sender != null) sender.close();
if (session != null) session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Override
public String receiveMessage() throws JMSException {
QueueSession session = null;
QueueReceiver receiver = null;
TextMessage message = null;
try {
connection.start();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String selector = "JMSCorrelationID='" + correlationId + "'";
receiver = session.createReceiver(replyQueue, selector);
logger.info("Trying to receive message with selector " + selector);
message = (TextMessage) receiver.receive(1 * 1000);
if (message != null) {
logger.info("Message received! " + message.getText());
return message.getText();
}
else {
logger.info("No message received from queue");
return null;
}
} catch (JMSException e) {
e.printStackTrace();
return null;
} finally {
try {
if (receiver != null) receiver.close();
if (session != null) session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@PreDestroy
@PrePassivate
public void preDispose() {
try {
if (connection != null) connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Remove
@Override
public void dispose() {
logger.info("Chiusura EJB statefull");
}
}
Cerchiamo di capire cosa fa questo EJB Stateful:
- Per poter inviare e ricevere messaggi la classe gestisce:
-
una connection factory
QueueConnectionFactory
la cui istanza è iniettata dal server tramite l’annotazione@Resource
. In JBoss, il nome JNDI di default della factory in questione è ConnectionFactory; -
due code
Queue
, una per la richieste, una per le risposte, iniettate anch’esse dal server. I nomi sono quelli definiti nel nostro file queues-hornetq-jms.xml; -
una connessione
QueueConnection
al broker JMS creata dalla factory al momento dell’inizializzazione dell’EJB e chiusa prima della sua distruzione (vedi annotazioni@PostConstruct
,@PostActivate
,@PreDestroy
e@PrePassivate
). Se si hanno problemi di timeout della connessione conviene gestirla in modo diverso, anche se aprire troppe connessioni verso il broker ha un suo costo.
-
una connection factory
- Il metodo
sendMessage(String text)
invia il testo ricevuto in ingresso comeTextMessage
:- viene creata una sessione di comunicazione dalla connessione come non transactional e con auto ack. Cosa significa?
- transactional: in JMS una transazione è un gruppo di messaggi che sono consegnati o ricevuti o tutti insieme o nessuno;
-
ack mode: modo con cui il broker JMS riceve i messaggi. I valori possibili sono:
- CLIENT_ACKNOWLEDGE: il JMS server non può inviare messaggi in sequenza finché non ha ricevuto un segnale di accettazione (ack, accettazione appunto) tra un messaggio e l’altro.
- AUTO_ACKNOWLEDGE: il messaggio viene consegnato una ed una sola volta. Questa policy genera un certo overhead sul server ma è la più usata.
- DUPS_ACKNOWLEDGE: il messaggio può essere inviato più volte, riducendo l’overhead sul server ma aumentando quello di rete.
- viene creato un
TextMessage
che conterrà il testo ricevuto in ingresso. - viene generato il correlation id e impostato nel messaggio. Per non creare confusione, è necessario che questo valore sia univoco sulla coda. Nel nostro caso abbiamo usato il timestamp in millisecondi (anche se si potrebbe fare di meglio!).
- dalla sessione viene creato un
QueueSender
sulla coda delle richieste e viene inviato il messagio.
- viene creata una sessione di comunicazione dalla connessione come non transactional e con auto ack. Cosa significa?
- il metodo
receiveMessage()
viene chiamato per controllare che ci siano messaggi sulla coda di risposta.- per prima cosa viene avviata la connessione
connection.start()
con il broker JMS. Questo passo è fondamentale altrimenti non riceveremo nessun messaggio nonostante il codice sia corretto; - si apre poi una sessione e si costruisce un selector basato sul correlation id del messaggio inviato precedentemente;
- si costruisce un receiver che filtra i messaggi caratterizzati dal nostro selector e infine sta in ascolto per 1 secondo (
receiver.receive(1 * 1000)
).
- per prima cosa viene avviata la connessione
- per ricevere messaggi possiamo creare un meccanismo di polling che invoca perioridamente questo metodo, oppure possiamo usare una più moderna Web Socket per una notifica push dal server al client non appena il messaggio è stato ricevuto. Se scegliamo questa strada, dovremo chiamare
receiver.receive()
senza argomenti: finché il messaggio non verrà ricevuto, il metodoreceiveMessage()
non avrà termine. Vedremo in un’altra occasione come implementare questa soluzione.
Il destinatario
Fin’ora abbiamo visto come si fa a inviare e ricevere un messaggio di risposta. Ma chi riceve il messaggio e invia la risposta? Stiamo parlando di un listener su una certa coda, ovvero di un Message Driven Bean. Vediamone il codice:
@MessageDriven(
activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/RequestQueue")
})
public class CorrelatedMessageHandlerMDB implements MessageListener {
private static final Logger logger = Logger.getLogger(CorrelatedMessageHandlerMDB.class);
@Resource(mappedName = "ConnectionFactory")
private QueueConnectionFactory queueFactory;
@Resource(mappedName = "jms/ResponseQueue")
private Queue replyQueue;
private QueueConnection connection;
@PostConstruct
public void init() {
logger.info("Init request MDB listener...");
try {
connection = queueFactory.createQueueConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* @see MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
QueueSession session = null;
QueueSender sender = null;
try {
logger.info(((TextMessage) message).getText());
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
TextMessage responseMessage = session.createTextMessage();
responseMessage.setText("Echo: " + ((TextMessage) message).getText());
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
sender = session.createSender(replyQueue);
sender.send(responseMessage);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (sender != null) sender.close();
if (session != null) session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@PreDestroy
public void preDispose() {
logger.info("Closing MDB listener...");
try {
if (connection != null) connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- ogni MDB che utilizza JMS come sistema di messaggi deve implementare l’interfaccia
javax.jms.MessageListener
; - tra i suoi parametri di attivazione (
@ActivationConfigProperty
) deve essere indicata la destinazione e il tipo, cioè dove ascoltare. In questo caso abbiamo configurato il nostro MDB per essere un listener su una coda JMSjavax.jms.Queue
di nomejms/RequestQueue
; - tramite injection sull’attributo
replyQueue
, l’MDB sa che deve sempre rispondere sulla codajms/ResponseQueue
; - la connessione al broker JMS viene aperta e chiusa dopo la creazione dell’MDB (
@PostConstruct
) e prima della sua distruzione (@PreDestroy
). Se questo dovesse creare problemi di timeout della connessione, è opportuno inserire apertura e chiusura direttamente nel metodoonMessage(...)
; - quando viene ricevuto un messaggio, viene invocato il metodo
onMessage(...)
. Per rispondere al messaggio vengono eseguite le stesse operazioni viste precedentemente: si crea una sessione e un sender sulla coda di risposta. Da notare che in questo caso il correlation id del messaggio ricevuto viene copiato in quello che verrà inviato in risposta, come richiede il pattern.
Conclusioni
In un post precedente avevamo illustrato un po’ di teoria che sta dietro al mondo JMS. In questo post invece abbiamo visto nel concreto come si fa ad applicare il pattern Correlation ID in JBoss applicandolo alla lettera. In questo caso è possibile modificare il pattern variando la coda di risposta: invece di avere una coda predefinita, è possibile inserire nel messaggio di invio il riferimento alla coda su cui rispondere, come richiesto per esempio dal pattern Message ID che vedremo .