Riprendiamo e concludiamo il filone dei Message Driven Bean (MDB) che avevamo iniziato in due post precedenti: , avevamo descritto la struttra asincrona dei MDB che ricevono e inviano messaggi JMS, con riferimento a qualche pattern architetturale; avevamo invece messo mano al codice, in particolare su come implementare il Request/Reply pattern basato sul Correlation ID. Come promesso, vediamo adesso cosa bisogna fare per implementare invece il pattern basato su Message ID.
Mittente
Visto che, come dicevano i latini, repetita iuvant, riproponiamo brevemente le differenze tra Correlation ID e Message ID pattern per capire cosa andremo a fare.
JMS Pattern | Response Queue | CorrelationID |
---|---|---|
Correlation ID Pattern | Tutte le risposte vanno sulla stessa coda prefissata | Il server copia il Correlation ID della richiesta nel Correlation ID della risposta |
Message ID Pattern | La risposta viene indirizzata dinamicamente alla coda specificata nel messaggio del mittente | Il server copia il Message ID della richiesta nel Correlation ID della risposta |
Nel caso che vogliamo presentare adesso ci aspetteremo che:
- il mittente specifichi la coda di risposta nel messaggio che invia (che potrebbe essere per esempio una coda temporanea);
- il ricevente costruisca il messaggio di risposta in modo che il suo correlation id sia uguale al message id ricevuto.
Il codice che andremo a vedere sarà molto simile a quello , almeno nell’interfaccia e nella struttura dell’algoritmo. Come nel caso precedente, abbiamo bisogno di un EJB Stateful, questa volta per mantenere memoria della coda in cui ascoltare i messaggi di risposta.
@Stateful
public class MessageInQueueHandlerEJB
implements MessageInQueueHandlerEJBLocal {
private static final Logger logger =
Logger.getLogger(MessageInQueueHandlerEJB.class);
@Resource(mappedName = "ConnectionFactory")
private QueueConnectionFactory queueFactory;
@Resource(mappedName = "jms/RequestQueue")
private Destination requestQueue;
private Destination replyQueue;
private QueueConnection connection;
private Session session;
private MessageProducer requestProducer;
private MessageConsumer replyConsumer;
@PostConstruct
public void init() {
logger.info("Init EJB Statefull");
try {
connection = queueFactory.createQueueConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
replyQueue = session.createTemporaryQueue();
requestProducer = session.createProducer(requestQueue);
replyConsumer = session.createConsumer(replyQueue);
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void sendMessage(final String text) {
try {
TextMessage requestMessage = session.createTextMessage();
requestMessage.setText(text);
requestMessage.setJMSReplyTo(replyQueue);
requestProducer.send(requestMessage);
logger.info("Sent request");
logger.info("\tTime: " + System.currentTimeMillis() + " ms");
logger.info("\tMessage ID: " + requestMessage.getJMSMessageID());
logger.info("\tCorr. ID: " + requestMessage.getJMSCorrelationID());
logger.info("\tReply to: " + requestMessage.getJMSReplyTo());
logger.info("\tContents: " + requestMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public String receiveMessage() throws JMSException {
connection.start();
Message msg = replyConsumer.receive(3000);
if (msg instanceof TextMessage) {
TextMessage replyMessage = (TextMessage) msg;
logger.info("Received reply ");
logger.info("\tTime: " + System.currentTimeMillis() + " ms");
logger.info("\tMessage ID: " + replyMessage.getJMSMessageID());
logger.info("\tCorr. ID: " + replyMessage.getJMSCorrelationID());
logger.info("\tReply to: " + replyMessage.getJMSReplyTo());
logger.info("\tContents: " + replyMessage.getText());
return replyMessage.getText();
} else {
logger.info("No message received");
return null;
}
}
@PreDestroy
public void preDispose() {
try {
if (replyConsumer != null) replyConsumer.close();
if (requestProducer != null) requestProducer.close();
if (connection != null) connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Remove
@Override
public void dispose() {
logger.info("Disposing EJB stateful");
}
}
Visto che conoscete già il codice (vero?), evidenziamo solo le differenze rispetto all’altro pattern. La prima cosa che salta all’occhio è l’uso di classi come Destination
al posto di Queue
o di MessageProducer
e MessageConsumer
per inviare e ricevere messaggi. Cerchiamo di capire meglio:
- all’inizializzazione dell’EJB:
- viene aperta la connessione al broker JMS;
- viene aperta una sessione di comunicazione che a differenza del Correlation ID pattern verrà usata sia per inviare che ricevere messaggi;
- dalla sessione viene creata una coda temporanea (
replyQueue
) che servirà per ricevere il messaggio di risposta; - vengono istanziati un producer e un consumer di messaggi.
Con una inizializzazione di questo tipo è possibile mandare più messaggi in coda e ricevere tutte le risposte sulla coda temporanea creata. Se si fosse invece deciso di creare le sessioni e i consumer/producer nei metodi di invio (
sendMessage
) e ricezione (receiveMessage
), il sistema avrebbe funzionato comunque, ma la coda di ricezione sarebbe stata sempre nuova (cioè l’ultima creata), comportando la perdita delle risposte di messaggi inviati in serie. - una volta inizializzato, il messaggio viene inviato tramite il metodo
sendMessage
: viene impostata la coda di risposta nel messaggio e il message producer provvede al suo invio. - il metodo di ricezione delle risposte non fa altro che chiedere al message consumer di attendere per 3 secondi il messaggio di risposta.
Destinatario
Il MDB che sta in ascolto sulla coda del mittente non è diverso da quello visto nel Correlation ID pattern se non per il fatto che:
- imposta la coda di risposta prelevandola dal messaggio ricevuto;
- copia il message id nel messaggio ricevuto nel correlation id del messaggio di risposta.
@MessageDriven(
activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/RequestQueue")
})
public class MessageInQueueHandlerMDB implements MessageListener {
private static final Logger logger =
Logger.getLogger(MessageInQueueHandlerMDB.class);
@Resource(mappedName = "ConnectionFactory")
private QueueConnectionFactory queueFactory;
private QueueConnection connection;
private Session session;
/**
* Default constructor.
*/
public MessageInQueueHandlerMDB() {
}
@PostConstruct
public void init() {
logger.info("Init request MDB listener...");
try {
connection = queueFactory.createQueueConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* @see MessageListener#onMessage(Message)
*/
public void onMessage(final Message message) {
try {
logger.info(((TextMessage) message).getText());
if ((message instanceof TextMessage) && (message.getJMSReplyTo() != null)) {
TextMessage requestMessage = (TextMessage) message;
logger.info("Received request");
logger.info("\tTime: " + System.currentTimeMillis() + " ms");
logger.info("\tMessage ID: " + requestMessage.getJMSMessageID());
logger.info("\tCorrel. ID: " + requestMessage.getJMSCorrelationID());
logger.info("\tReply to: " + requestMessage.getJMSReplyTo());
logger.info("\tContents: " + requestMessage.getText());
String contents = requestMessage.getText();
Destination replyDestination = message.getJMSReplyTo();
MessageProducer replyProducer = session.createProducer(replyDestination);
TextMessage replyMessage = session.createTextMessage();
replyMessage.setText(contents);
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
replyProducer.send(replyMessage);
logger.info("Sent reply");
logger.info("\tTime: " + System.currentTimeMillis() + " ms");
logger.info("\tMessage ID: " + replyMessage.getJMSMessageID());
logger.info("\tCorrel. ID: " + replyMessage.getJMSCorrelationID());
logger.info("\tReply to: " + replyMessage.getJMSReplyTo());
logger.info("\tContents: " + replyMessage.getText());
} else {
logger.info("Error receiving message");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
@PreDestroy
public void preDispose() {
try {
if (connection != null) connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Conclusioni
Siamo arrivati così alla fine di questa analisi sul mondo degli MDB. Il codice presentato in questi post non vuole essere niente di esaustivo (attenzione al timeout della connessione al broker!), ma vuole riportare chiarezza sulla confusione che si trova spesso in rete sull’uso del message id e del correlation id. Per quel che si è visto, l’uso di uno o dell’altro pattern arriva a raggiungere sempre lo stesso risultato (cioè ricevere messaggi da un MDB) anche se, a livello implementativo, richiedono non poche differenze che inizialmente non sono facili da individuare.