Saturday, August 12, 2006

JMS request/reply from an EJB

Sometimes something that appears trivial turns out to be a lot more involved. Doing JMS request/reply from an EJB is such a case. Let's take a look...

Request/reply

JMS can be used as a form of inter process communication. JMS is meant for asynchronous calls, but it can also used for synchronous calls. In that case, the first process sends a message to a queue or topic, and waits until another process has replied to the message. The reply is sent to a different queue or topic. Advantages of using JMS for interprocess communication is that the two processes are completely decoupled. The two processes only need to agree on what is in the message.

Synchronous communication in JMS is also called request/reply.

The JMS api provides two helper classes to make request/reply easier. They are the QueueRequestor and TopicRequestor classes. Here is an example of how they can be used:

QueueConnection c = (QueueConnectionFactory) (new
InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new InitialContext().lookup("requestqueue"));
QueueRequestor r = new QueueRequestor(s, q);
Message reply = r.request(s.createTextMessage("1233442342"));
c.close();

Looks simple, right? However, when this code is used in a typical EJB, it will likely not work. Why not? Is there something wrong with the QueeuRequestor? Well, yes, yet that is not the problem. Let's look at the implementation of the QueueRequestor:

public class QueueRequestor {
QueueSession session;
Queue queue;
TemporaryQueue tempQueue;
QueueSender sender;
QueueReceiver receiver;

public QueueRequestor(QueueSession session, Queue queue) throws JMSException {
this.session = session;
this.queue = queue;
tempQueue = session.createTemporaryQueue();
sender = session.createSender(queue);
receiver = session.createReceiver(tempQueue);
}
public Message request(Message message) throws JMSException {
message.setJMSReplyTo(tempQueue);
sender.send(message);
return receiver.receive();
}

public void close() throws JMSException {
session.close();
tempQueue.delete();
}
}

So we can rewrite the above code sample as follows:

QueueConnection c = (QueueConnectionFactory) (new InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new
InitialContext().lookup("requestqueue"));
Message request = s.createTextMessage("1233442342");
request.setJMSReplyTo(s.createTemporaryQueue();
s.createSender(q).send(m);
Message reply = s.createReceiver(q).receive();
c.close();

There's nothing obviously wrong with that, so why does this typically not work when this code is in an EJB?

Transactions

Before we look at the explanation of why the above code doesn't work in an EJB, let's review some basics about application servers and JMS servers. A JMS server is typically a stand alone process. Applications can communicate with this server using a client runtime. The client runtime implements the JMS API as defined in the JMS specification. If you're building a stand-alone application, you use the client runtime directly. Example, when you're using the JMS server shipped with Java CAPS you would be using com.stc.stcjms.jar to talk to the server process (stcms.exe). The client runtime implemented in com.stc.jms.stcjms.jar implements the JMS api and communicates with the server (stcms.exe) over tcp/ip sockets.

However, when your code lives in an EJB, you're not using the JMS client runtime directly. For example, when you obtain a connection factory from JNDI in an EJB, you will not get the connection factory object that belongs to the JMS provider itself, but you get a factory that is implemented by a resource adapter. As of J2EE 1.4, the interfacing between the application server and JMS is done through a Resource Adapter. In the example for Java CAPS, you would obtain a connection factory implemented by the JMSJCA resource adapter instead of the com.stc.jms.client.STCQueueConnectionFactory that belongs to the STCMS client runtime.

The Resource Adapter acts like a wrapper around the objects that are implemented in the JMS client runtime. One of the responsibilities of the Resource Adapter is to make sure that JMS acts like a transactional resource. This means that JMS connection (technically, the JMS session) is enlisted with the transaction manager of the applicaiton server. In fact, under the covers the Resource Adapter really uses an XASession rather than an AUTO_ACKNOWLEDGE session. What effect did it have to specify the arguments to

c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

In fact, these arguments are ignored, so they have no effect whatsoever.

Use of JMS in a transaction

Let's say that we have a bean managed EJB (BMT) with the following code:

EJBContext ctx = ...;
ctx.getUserTransaction().begin();
QueueConnection c = (QueueConnectionFactory) (new
InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new InitialContext().lookup("requestqueue"));
Message request = s.createTextMessage("1233442342");
request.setJMSReplyTo(s.createTemporaryQueue();
s.createSender(q).send(m);
Message reply = s.createReceiver(q).receive();
c.close();
ctx.getUserTransaction().commit();

The message will be sent to the queue only when the statement getUserTransaction().commit() is executed. Before this statement, the message will not be visible anywhere.  So when the receive() method is executed, the message was really not sent to the queue. The other process never gets the request, and hence, a reply will never be received. Since no timeout is specified, the application simply hangs.

Now that we know that, the solution is pretty simple: just commit the transaction before receiving the reply.

ctx.getUserTransaction().begin();
QueueConnection c = (QueueConnectionFactory) (new InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new InitialContext().lookup("requestqueue"));
Message request = s.createTextMessage("1233442342");
request.setJMSReplyTo(s.createTemporaryQueue();
s.createSender(q).send(m);
ctx.getUserTransaction().commit();

ctx.getUserTransaction().begin();
Message reply = s.createReceiver(q).receive();
c.close();
ctx.getUserTransaction().commit();

It depends on the business logic if you can simply split up the transaction in two. Perhaps something else happens in the transaction, for example you could update a database in the same transaction. In that case, you may not want to commit the transaction yet! How can we deal with that situation?

The solution would be to suspend the transaction, create a new transaction, send the request, commit the new transaction, unsuspend the first transaction, and continue. Sounds complicated? It is, but it can be simplified using an EJB: simply add a new business method to the EJB, make sure that the method's trans-attribute is set to NotSupported.
Next, instantiate a new EJB and call that method. By calling that method, the transaction will automatically be suspended.

Note that it is crucial to instantiate a new EJB: if you just call the new business method from the same EJB instance, the application server will not intercept the method and will not suspend the transaction.

When the EJB is CMT

Pretty much the same goes if the EJB is not BMT but CMT. If a transaction was started, the same deadlock occurs: the send() is never committed until the transaction is committed, so the receive() method will never receive a reply. In that case, add a new business method and make sure that the trans-attribute is set to RequiresNew. The application server will then suspend the main transaction and start a new transaction automatically. As with BMT, make sure that you call the new business method on a new instance of the EJB rather than on the same instance.

When there is no transaction

What happens if there is no transaction when you call send()? Well, that depends on the implementation. The specification is not clear on what should happen when an XA resource is used outside of a transaction. The JMS CTS implies that the behavior should be that of auto-commit (or AUTO_ACKNOWLEDGE in JMS parlance). CTS stands for Compliance Test Suite. The CTS for JMS consists of a few thousand tests. Every implementation that claims that  be compliant with the JMS spec must pass these tests. Hence, most implementations, including the one that ships with Java CAPS will exhibit AUTO_ACKNOWLEDGE behavior. In that case, the above code will work and the QueueRequestor can be used.

What about the QueueRequestor?

Can't we use the QueueRequestor with transactions? The simple answer is no. Why did the Resource Adapter not take care of this? The problem is that the QueueRequestor is implemented by the JMS api, and not by the JMS provider or Resource Adapter. Perhaps this is another example why it is generally a bad idea to provide implementation classes in API-s?

There's another problem with the QueueRequestor class: there is no way to specify a timeout value. You would probably prefer a timeout exception rather than having a hanging application.

Conclusion

Knowing what happens under the covers explains why request/reply cannot work in a single transaction. Simply post the request in a separate transaction.

Post a Comment