Riprendiamo e concludiamo il filone dei Message Driven Bean (MDB) che avevamo iniziato in due post precedenti: in uno più teorico, avevamo descritto la struttra asincrona dei MDB che ricevono e inviano messaggi JMS, con riferimento a qualche pattern architetturale; nel secondo avevamo invece messo mano al codice, in particolare su come implementare il Request/Reply pattern basato sul Correlation ID. Come promesso, vediamo adesso cosa bisogna fare per implementare invece il pattern basato su Message ID.

Mittente

Visto che, come dicevano i latini, repetita iuvant, riproponiamo brevemente le differenze tra Correlation ID e Message ID pattern per capire cosa andremo a fare.

JMS Pattern Response Queue CorrelationID
Correlation ID Pattern Tutte le risposte vanno sulla stessa coda prefissata Il server copia il Correlation ID della richiesta nel Correlation ID della risposta
Message ID Pattern La risposta viene indirizzata dinamicamente alla coda specificata nel messaggio del mittente Il server copia il Message ID della richiesta nel Correlation ID della risposta

Nel caso che vogliamo presentare adesso ci aspetteremo che:

  • il mittente specifichi la coda di risposta nel messaggio che invia (che potrebbe essere per esempio una coda temporanea);
  • il ricevente costruisca il messaggio di risposta in modo che il suo correlation id sia uguale al message id ricevuto.

Il codice che andremo a vedere sarà molto simile a quello già mostrato, almeno nell’interfaccia e nella struttura dell’algoritmo. Come nel caso precedente, abbiamo bisogno di un EJB Stateful, questa volta per mantenere memoria della coda in cui ascoltare i messaggi di risposta.

@Stateful
public class MessageInQueueHandlerEJB 
        implements MessageInQueueHandlerEJBLocal {

  private static final Logger logger = 
        Logger.getLogger(MessageInQueueHandlerEJB.class);

  @Resource(mappedName = "ConnectionFactory")
  private QueueConnectionFactory queueFactory;

  @Resource(mappedName = "jms/RequestQueue")
  private Destination requestQueue;
  
  private Destination replyQueue;
  
  private QueueConnection connection;
  private Session session;
  private MessageProducer requestProducer;
  private MessageConsumer replyConsumer;
  
  @PostConstruct
  public void init() {
    logger.info("Init EJB Statefull");
    try {
      connection = queueFactory.createQueueConnection();
      
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      replyQueue = session.createTemporaryQueue();
      
      requestProducer = session.createProducer(requestQueue);
      replyConsumer = session.createConsumer(replyQueue);
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void sendMessage(final String text) {
    try {
      TextMessage requestMessage = session.createTextMessage();
      requestMessage.setText(text);
      requestMessage.setJMSReplyTo(replyQueue);
      requestProducer.send(requestMessage);
      
      
      logger.info("Sent request");
      logger.info("\tTime:    " + System.currentTimeMillis() + " ms");
      logger.info("\tMessage ID: " + requestMessage.getJMSMessageID());
      logger.info("\tCorr. ID: " + requestMessage.getJMSCorrelationID());
      logger.info("\tReply to:   " + requestMessage.getJMSReplyTo());
      logger.info("\tContents:   " + requestMessage.getText());
      
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  @Override
  public String receiveMessage() throws JMSException {
    connection.start();
    
    Message msg = replyConsumer.receive(3000);
    if (msg instanceof TextMessage) {
      TextMessage replyMessage = (TextMessage) msg;
      logger.info("Received reply ");
      logger.info("\tTime:    " + System.currentTimeMillis() + " ms");
      logger.info("\tMessage ID: " + replyMessage.getJMSMessageID());
      logger.info("\tCorr. ID: " + replyMessage.getJMSCorrelationID());
      logger.info("\tReply to:   " + replyMessage.getJMSReplyTo());
      logger.info("\tContents:   " + replyMessage.getText());
      
      return replyMessage.getText();
    } else {
      logger.info("No message received");
      return null;
    }
    
  }
  
  @PreDestroy
  public void preDispose() {
    try {
      if (replyConsumer != null) replyConsumer.close();
      if (requestProducer != null) requestProducer.close();
      if (connection != null) connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
  
  @Remove
  @Override
  public void dispose() {
    logger.info("Disposing EJB stateful");
  }
}

Visto che conoscete già il codice (vero?), evidenziamo solo le differenze rispetto all’altro pattern. La prima cosa che salta all’occhio è l’uso di classi come Destination al posto di Queue o di MessageProducer e MessageConsumer per inviare e ricevere messaggi. Cerchiamo di capire meglio:

  • all’inizializzazione dell’EJB:
    • viene aperta la connessione al broker JMS;
    • viene aperta una sessione di comunicazione che a differenza del Correlation ID pattern verrà usata sia per inviare che ricevere messaggi;
    • dalla sessione viene creata una coda temporanea (replyQueue) che servirà per ricevere il messaggio di risposta;
    • vengono istanziati un producer e un consumer di messaggi.

    Con una inizializzazione di questo tipo è possibile mandare più messaggi in coda e ricevere tutte le risposte sulla coda temporanea creata. Se si fosse invece deciso di creare le sessioni e i consumer/producer nei metodi di invio (sendMessage) e ricezione (receiveMessage), il sistema avrebbe funzionato comunque, ma la coda di ricezione sarebbe stata sempre nuova (cioè l’ultima creata), comportando la perdita delle risposte di messaggi inviati in serie.

  • una volta inizializzato, il messaggio viene inviato tramite il metodo sendMessage: viene impostata la coda di risposta nel messaggio e il message producer provvede al suo invio.
  • il metodo di ricezione delle risposte non fa altro che chiedere al message consumer di attendere per 3 secondi il messaggio di risposta.

Destinatario

Il MDB che sta in ascolto sulla coda del mittente non è diverso da quello visto nel Correlation ID pattern se non per il fatto che:

  • imposta la coda di risposta prelevandola dal messaggio ricevuto;
  • copia il message id nel messaggio ricevuto nel correlation id del messaggio di risposta.
@MessageDriven(
  activationConfig = { 
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/RequestQueue")
})
public class MessageInQueueHandlerMDB implements MessageListener {

   private static final Logger logger = 
        Logger.getLogger(MessageInQueueHandlerMDB.class);
   @Resource(mappedName = "ConnectionFactory")
   private QueueConnectionFactory queueFactory;
   private QueueConnection connection;
   private Session session;
   
    /**
     * Default constructor. 
     */
    public MessageInQueueHandlerMDB() {
    }
   
    @PostConstruct
    public void init() {
       logger.info("Init request MDB listener...");
      try {
         connection = queueFactory.createQueueConnection();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      } catch (JMSException e) {
         e.printStackTrace();
      }
    }
    
   /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(final Message message) {
       try {
         logger.info(((TextMessage) message).getText());
          if ((message instanceof TextMessage) && (message.getJMSReplyTo() != null)) {
            TextMessage requestMessage = (TextMessage) message;

            logger.info("Received request");
            logger.info("\tTime:       " + System.currentTimeMillis() + " ms");
            logger.info("\tMessage ID: " + requestMessage.getJMSMessageID());
            logger.info("\tCorrel. ID: " + requestMessage.getJMSCorrelationID());
            logger.info("\tReply to:   " + requestMessage.getJMSReplyTo());
            logger.info("\tContents:   " + requestMessage.getText());

            String contents = requestMessage.getText();
            Destination replyDestination = message.getJMSReplyTo();
            MessageProducer replyProducer = session.createProducer(replyDestination);

            TextMessage replyMessage = session.createTextMessage();
            replyMessage.setText(contents);
            replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
            replyProducer.send(replyMessage);

            logger.info("Sent reply");
            logger.info("\tTime:       " + System.currentTimeMillis() + " ms");
            logger.info("\tMessage ID: " + replyMessage.getJMSMessageID());
            logger.info("\tCorrel. ID: " + replyMessage.getJMSCorrelationID());
            logger.info("\tReply to:   " + replyMessage.getJMSReplyTo());
            logger.info("\tContents:   " + replyMessage.getText());
         } else { 
            logger.info("Error receiving message");
         }      
      } catch (JMSException e) {
         e.printStackTrace();
      }
  }
    
   @PreDestroy
   public void preDispose() {
      try {
         if (connection != null) connection.close();
      } catch (JMSException e) {
         e.printStackTrace();
      }
   }
}

Conclusioni

Siamo arrivati così alla fine di questa analisi sul mondo degli MDB. Il codice presentato in questi post non vuole essere niente di esaustivo (attenzione al timeout della connessione al broker!), ma vuole riportare chiarezza sulla confusione che si trova spesso in rete sull’uso del message id e del correlation id. Per quel che si è visto, l’uso di uno o dell’altro pattern arriva a raggiungere sempre lo stesso risultato (cioè ricevere messaggi da un MDB) anche se, a livello implementativo, richiedono non poche differenze che inizialmente non sono facili da individuare.

83 Posts

Sono un software engineer focalizzato nella progettazione e sviluppo di applicazioni web in Java. Presso OmniaGroup ricopro il ruolo di Tech Leader sulle tecnologie legate alla piattaforma Java EE 5 (come WebSphere 7.0, EJB3, JPA 1 (EclipseLink), JSF 1.2 (Mojarra) e RichFaces 3) e Java EE 6 con JBoss AS 7, in particolare di CDI, JAX-RS, nonché di EJB 3.1, JPA2, JSF2 e RichFaces 4. Al momento mi occupo di ECM, in particolar modo sulla customizzazione di Alfresco 4 e sulla sua installazione con tecnologie da devops come Vagrant e Chef. In passato ho lavorato con la piattaforma alternativa alla enterprise per lo sviluppo web: Java SE 6, Tomcat 6, Hibernate 3 e Spring 2.5. Nei ritagli di tempo sviluppo siti web in PHP e ASP. Per maggiori informazioni consulta il mio curriculum pubblico. Follow me on Twitter - LinkedIn profile - Google+