Saturday, August 12, 2006

JMS request/reply from an EJB

Sometimes something that appears trivial turns out to be a lot more involved. Doing JMS request/reply from an EJB is such a case. Let's take a look...

Request/reply

JMS can be used as a form of inter process communication. JMS is meant for asynchronous calls, but it can also used for synchronous calls. In that case, the first process sends a message to a queue or topic, and waits until another process has replied to the message. The reply is sent to a different queue or topic. Advantages of using JMS for interprocess communication is that the two processes are completely decoupled. The two processes only need to agree on what is in the message.

Synchronous communication in JMS is also called request/reply.

The JMS api provides two helper classes to make request/reply easier. They are the QueueRequestor and TopicRequestor classes. Here is an example of how they can be used:

QueueConnection c = (QueueConnectionFactory) (new
InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new InitialContext().lookup("requestqueue"));
QueueRequestor r = new QueueRequestor(s, q);
Message reply = r.request(s.createTextMessage("1233442342"));
c.close();

Looks simple, right? However, when this code is used in a typical EJB, it will likely not work. Why not? Is there something wrong with the QueeuRequestor? Well, yes, yet that is not the problem. Let's look at the implementation of the QueueRequestor:

public class QueueRequestor {
QueueSession session;
Queue queue;
TemporaryQueue tempQueue;
QueueSender sender;
QueueReceiver receiver;

public QueueRequestor(QueueSession session, Queue queue) throws JMSException {
this.session = session;
this.queue = queue;
tempQueue = session.createTemporaryQueue();
sender = session.createSender(queue);
receiver = session.createReceiver(tempQueue);
}
public Message request(Message message) throws JMSException {
message.setJMSReplyTo(tempQueue);
sender.send(message);
return receiver.receive();
}

public void close() throws JMSException {
session.close();
tempQueue.delete();
}
}

So we can rewrite the above code sample as follows:

QueueConnection c = (QueueConnectionFactory) (new InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new
InitialContext().lookup("requestqueue"));
Message request = s.createTextMessage("1233442342");
request.setJMSReplyTo(s.createTemporaryQueue();
s.createSender(q).send(m);
Message reply = s.createReceiver(q).receive();
c.close();

There's nothing obviously wrong with that, so why does this typically not work when this code is in an EJB?

Transactions

Before we look at the explanation of why the above code doesn't work in an EJB, let's review some basics about application servers and JMS servers. A JMS server is typically a stand alone process. Applications can communicate with this server using a client runtime. The client runtime implements the JMS API as defined in the JMS specification. If you're building a stand-alone application, you use the client runtime directly. Example, when you're using the JMS server shipped with Java CAPS you would be using com.stc.stcjms.jar to talk to the server process (stcms.exe). The client runtime implemented in com.stc.jms.stcjms.jar implements the JMS api and communicates with the server (stcms.exe) over tcp/ip sockets.

However, when your code lives in an EJB, you're not using the JMS client runtime directly. For example, when you obtain a connection factory from JNDI in an EJB, you will not get the connection factory object that belongs to the JMS provider itself, but you get a factory that is implemented by a resource adapter. As of J2EE 1.4, the interfacing between the application server and JMS is done through a Resource Adapter. In the example for Java CAPS, you would obtain a connection factory implemented by the JMSJCA resource adapter instead of the com.stc.jms.client.STCQueueConnectionFactory that belongs to the STCMS client runtime.

The Resource Adapter acts like a wrapper around the objects that are implemented in the JMS client runtime. One of the responsibilities of the Resource Adapter is to make sure that JMS acts like a transactional resource. This means that JMS connection (technically, the JMS session) is enlisted with the transaction manager of the applicaiton server. In fact, under the covers the Resource Adapter really uses an XASession rather than an AUTO_ACKNOWLEDGE session. What effect did it have to specify the arguments to

c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

In fact, these arguments are ignored, so they have no effect whatsoever.

Use of JMS in a transaction

Let's say that we have a bean managed EJB (BMT) with the following code:

EJBContext ctx = ...;
ctx.getUserTransaction().begin();
QueueConnection c = (QueueConnectionFactory) (new
InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new InitialContext().lookup("requestqueue"));
Message request = s.createTextMessage("1233442342");
request.setJMSReplyTo(s.createTemporaryQueue();
s.createSender(q).send(m);
Message reply = s.createReceiver(q).receive();
c.close();
ctx.getUserTransaction().commit();

The message will be sent to the queue only when the statement getUserTransaction().commit() is executed. Before this statement, the message will not be visible anywhere.  So when the receive() method is executed, the message was really not sent to the queue. The other process never gets the request, and hence, a reply will never be received. Since no timeout is specified, the application simply hangs.

Now that we know that, the solution is pretty simple: just commit the transaction before receiving the reply.

ctx.getUserTransaction().begin();
QueueConnection c = (QueueConnectionFactory) (new InitialContext().lookup("qcf"));
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = (Queue) (new InitialContext().lookup("requestqueue"));
Message request = s.createTextMessage("1233442342");
request.setJMSReplyTo(s.createTemporaryQueue();
s.createSender(q).send(m);
ctx.getUserTransaction().commit();

ctx.getUserTransaction().begin();
Message reply = s.createReceiver(q).receive();
c.close();
ctx.getUserTransaction().commit();

It depends on the business logic if you can simply split up the transaction in two. Perhaps something else happens in the transaction, for example you could update a database in the same transaction. In that case, you may not want to commit the transaction yet! How can we deal with that situation?

The solution would be to suspend the transaction, create a new transaction, send the request, commit the new transaction, unsuspend the first transaction, and continue. Sounds complicated? It is, but it can be simplified using an EJB: simply add a new business method to the EJB, make sure that the method's trans-attribute is set to NotSupported.
Next, instantiate a new EJB and call that method. By calling that method, the transaction will automatically be suspended.

Note that it is crucial to instantiate a new EJB: if you just call the new business method from the same EJB instance, the application server will not intercept the method and will not suspend the transaction.

When the EJB is CMT

Pretty much the same goes if the EJB is not BMT but CMT. If a transaction was started, the same deadlock occurs: the send() is never committed until the transaction is committed, so the receive() method will never receive a reply. In that case, add a new business method and make sure that the trans-attribute is set to RequiresNew. The application server will then suspend the main transaction and start a new transaction automatically. As with BMT, make sure that you call the new business method on a new instance of the EJB rather than on the same instance.

When there is no transaction

What happens if there is no transaction when you call send()? Well, that depends on the implementation. The specification is not clear on what should happen when an XA resource is used outside of a transaction. The JMS CTS implies that the behavior should be that of auto-commit (or AUTO_ACKNOWLEDGE in JMS parlance). CTS stands for Compliance Test Suite. The CTS for JMS consists of a few thousand tests. Every implementation that claims that  be compliant with the JMS spec must pass these tests. Hence, most implementations, including the one that ships with Java CAPS will exhibit AUTO_ACKNOWLEDGE behavior. In that case, the above code will work and the QueueRequestor can be used.

What about the QueueRequestor?

Can't we use the QueueRequestor with transactions? The simple answer is no. Why did the Resource Adapter not take care of this? The problem is that the QueueRequestor is implemented by the JMS api, and not by the JMS provider or Resource Adapter. Perhaps this is another example why it is generally a bad idea to provide implementation classes in API-s?

There's another problem with the QueueRequestor class: there is no way to specify a timeout value. You would probably prefer a timeout exception rather than having a hanging application.

Conclusion

Knowing what happens under the covers explains why request/reply cannot work in a single transaction. Simply post the request in a separate transaction.

23 comments:

Ludovic Orban said...

Frank,
Your blog is a wonderful source of information on JCA, JMS and up to a certain extent JTA. Please continue posting great articles like these ones as it's hard to find good sources of information on those subjects.
I can't wait to read more on this !

Frank Kieviet said...

Thank you for your kind feedback! Let me know what topics you would be interested in.

Luo Shifei said...

Very Good!

manish dhall said...

Hi
Nice article. In most aplications we have requirement of sending acknowledge/acceptance and confrmation/rejection messages back to sender application.
So is using request/reply i/e synchronous message communication a nice option when volume of messages to be handled is really high.
Also what if there is some error in receiver process/ infrastructure issue etc. What would be exception handing approach in this case.
thx
mansih

Aditya said...

Hi Frank,
Your blog really showed some intresting concepts regarding transaction control in EJBs and JMS. But however, when I try to code in such a fashion, the response message always shows up a null!!! I tried both the CMP and the BMP approach. The only difference in my code is that I am not using a temporary queue but I am using a server administered queue. I double checked whether the message is sent on the commit(it does) and the MDB associated with the first queue even processes the message. After the processing of the message, the MDB itself populates a response queue. My ejb is actually waiting for this response queue and it always times out. I have attached the code. I would appreciate your help on this.Thanks-Aditya
public void sendRecieve()
{
try
{
String connectionInputFactoryName = "java:comp/env/jms/costingHighLevelInputQueueCF";
String inputQueueName = "Java:comp/env/jms/costingHighLevelInputQueue";
String outputQueueName = "java:comp/env/jms/costingHighLevelOutputQueue";
Queue inputQueue = null;
Queue outputQueue = null;
QueueSender queueSender = null;
InitialContext initialContext = new InitialContext();
mySessionCtx.getUserTransaction().begin();
queueConnectionFactory = (QueueConnectionFactory) PortableRemoteObject.narrow(initialContext.lookup(connectionInputFactoryName), QueueConnectionFactory.class);
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
inputQueue = (Queue) PortableRemoteObject.narrow(initialContext.lookup(inputQueueName), Queue.class);
ReviewDTO reviewDTO = new ReviewDTO();
reviewDTO.setAgentName("Aditya Narayan Barimar");
queueSender = queueSession.createSender(inputQueue);
Message theMessage = queueSession.createObjectMessage(reviewDTO);
String correlationId = Long.toString(System.currentTimeMillis());
theMessage.setJMSCorrelationID(correlationId);
queueSender.send(theMessage);
mySessionCtx.getUserTransaction().commit();
mySessionCtx.getUserTransaction().begin();
outputQueue = (Queue) PortableRemoteObject.narrow(initialContext.lookup(outputQueueName), Queue.class); QueueReceiver queueReceiver = queueSession.createReceiver(outputQueue);
Message reply = queueReceiver.receive(5000);
System.out.println("reply message is: " + reply);
mySessionCtx.getUserTransaction().commit();
}
catch (Throwable t)
{
t.printStackTrace();
}
finally
{
if (queueConnection != null)
{
try
{
queueSession.close();
queueConnection.close();
System.out.println("Queue connection has been closed");
}
catch (Throwable t)
{
t.printStackTrace();
}
}
}
}

Frank Kieviet said...

Hi Aditya,

The first thing I always check when I get null messages if I called <tt>connection.start()</tt>: I've made that mistake so often it's not funny anymore.

Did you call <tt>connection.start()</tt>?

Frank

Aditya said...

Hi Frank,
That was quick!!Thanks for your response and you were spot on!! I had not called connection.start() when I am using the UserTransaction!Now, I am getting a response message!!But do you think I must call the same when I am using container managed transaction?
Any other pitfalls that i must look out for in case of CMP? Because my team out here is quite apprehensive of using user transactions in a bean and would definitely like to port the application to a CMP bean.
Again, Thanks a million for your help!!

Frank Kieviet said...

Hi Aditya,


You also need to call <font face="courier new,courier,monospace">Connection.start()</font> in a CMT bean.


When you move this code to CMT, you will have to do receive() of the message in a separate bean instance, calling a method that was declared with transaction=<font face="courier new,courier,monospace">REQUIRES_NEW</font>. This is more complicated than using BMT, so if you have the choice between CMT and BMT, I would choose BMT. You don't always have the choice: e.g. if your code resides in an MDB, you would not want to change the semantics of the onMessage() method just because you want to do a request/reply from it.




Frank



&nbsp;


Sid said...

Hi....that was a nice article and like you said...something which looks to be trivial turns out to be a lot more complicated.I have one question pertaining to the request reply mode mentioned above.Like mentioned, when we send a message, we invoke it as a new transaction during which the main transaction is suspended. What would happen, if on return to the main transaction (after the message has been sent to the queue) an exception occurs in the main transaction? Will the message be reposted or this has to be taken care of programatically?? Since sending of the message is done in a seperate transaction, probably the message is not reposted if an exception occurs in the main transaction?

Frank Kieviet said...

Hi Sid,


If an exception happens in the main transaction, that transaction will be rolled back. If that results in a retry depends on what called the code: if it was called in an MDB, the JMS server will generally retry to send the message. In that case the request will be sent again.




What happened to the first reply? Although the main transaction was rolled back, the request was sent to the request queue (committed), and a reply was likely sent to the reply queue. Probably you would use a temporary queue for the reply queue. The temporary queue gets deleted automatically when the connection is closed, so effectively the reply is being discarded without being processed.


Frank&nbsp;


Martin said...

Hello
First, thanks for the explanation, I had lots of trouble with this issue.
I found an other way to do it (although it requires application server specific code): using the TransactionManager to suspend the transaction before creating the session and resuming right after it:
QueueConnection c = ...;
TransactionManager tm = ... (AS-specific);
Transaction t = tm.suspend();
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
tm.resume(t);
...

Vlad E said...

> c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

In fact, these arguments are ignored, so they have no effect whatsoever.

in Jboss it actually works and the message is sent before EJB transaction ended.

Also using java:/Jms to get Connection factory instead of java:/JmsXA might help:

@Resource(mappedName = "java:/Jms")

ConnectionFactory factory;

Nice blog by the way.

Dan Dubois said...

Hi Frank,

I know this is an old post but I am hoping that you will be able to help me.

I am using Glassfish V2U2 and want to use a request/reply scenario from my MessageDrivenBean (MDB).

From what your blog says I need to change the MDB from using Container Managed Transactions to Bean Managed Transactions. Is this correct? I would be very grateful if you could tell me how I can do this.

Best wishes,

Dan

Frank Kieviet said...

Re Dan:

Changing the transaction attribute of the MDB has other ramifications, and you typically don't want to do that just to be able to do a request/reply.

There are two options: you can create a SLSB with the TransactionAttribute set to REQUIRES_NEW; in this SLSB you would send your request.

The other option is to use a different connection pool for sending the request. This pool would be setup in GlassFish with transaction-support = NoTransaction. This works fine with the JMSJCA resource adapter; I have not checked this with the resource adapters that come with GlassFish.

You can find JMSJCA at http://jmsjca.dev.java.net.

Frank

Dan Dubois said...

Hi Frank,

Thanks for the quick reply. I realize this is not a help forum so please feel free not to reply. :-) I thought your first solution, using a SLSB with REQUIRES_NEW to be the most elegant. Unfortunately I get the following error after my onMessage method has run through:

prepareTransaction (XA) on JMSService:jmsdirect failed for connectionId:1497195242996848640 due to TxID is already in use.

JTS5031: Exception [java.lang.RuntimeException: javax.transaction.xa.XAException] on Resource [prepare] operation.

I then decided to just delegate everything from onMessage to a BMT EJB and follow your blog's example, but still the same exception.

There seems to be very little on the net in the way of support for these types of problems. Your blog entry seems to be the only decent thing on the subject right now and it is from 2006!

Best wishes,

Dan

Frank Kieviet said...

Re Dan:

You're right, having the MDB delegate to a BMT SLSB is also an option, and perhaps even a better option although it doesn't make that much difference.

I'm suspect that the error that you're getting is because you're passing the connection that you obtained in the MDB to the SLSB? This is something you need to avoid: you need to acquire a brand new connection in the SLSB.

The reason for this is that the MDB's connection is enlisted in the MDB's transaction. When you acquire a new connection in the SLSB, this new connection is enlisted in the SLSB's transaction.

It's really not much code (perhaps a dozen statements), but you have to be very careful that all the statements are in the right order and point to the right objects.

Let me know if, after checking that you're using a separate connection for sending, you still have a problem. I can send you a code snippet that illustrates the solution.

Frank

Dan Dubois said...

Hi Frank,

Again, thank you very much for your response. I would be very grateful for a code snippet. I have created a thread http://forum.java.sun.com/message.jspa?messageID=10338789 with the same topic as this but it also includes my code in question (nicely formatted). Would you be able to post the snippet there for continuity?

Best wishes,

Dan

Dan Dubois said...

Hi Frank,

Are you still able to provide the code snippet? The thread mentioned above seems to have died out without anyone providing a proper solution.

Best wishes,

Dan

pankaj said...

Hi Frank,

Could you please send the code snippet for using BMT SLSB from CMT MDB.

Thanks & Regards,

Pankaj

LSK said...

Hi Frank,

That was a great article and very informative.I am trying to run a request/reply code in Glassfish.

One client is a Servlet which sends a message to the MDB (other client) and also waits for the response using receive(timeout) . I am using temporaryQueue to implement it.

But is doesnot work . The servlet gets null message or sometimes it timesout.

Also Glassfish is highly unreliable. Sometimes is throws error that Queue could not be injected (I am using Annotations ) .

Can you send me some working example of Request/Reply ? I am not using any transaction in MDB . I am very new to JMS so kindly ignore if I have done wrong somewhere.

AgeBee said...

Hi, I got the same problem with a servlet where I connect to a topic via JMS.

Do you acciddentally know how to resolve the transaction-issue within a servlet ?

Thnak you. Regards AgeBee

Rahul said...

Hi Frank,

It seems a very good article.

But, I did not get the much things from that. It's because of my lack of understanding.

I did not get the following sentenses....Can you explain me those. It will be really great.

1) Applications can communicate with this server using a client runtime....

What this means? what is a client runtime?

2) What is meaning of the resouce adapter?

Again, please explain me...

Thanks,

Rahul

links london said...

I have received several similar emails like this one.