Wednesday, September 13, 2006

Testing connection failures in resource adapters

In a previous blog (see J2EE JCA Resource Adapters: Poisonous pools)  I talked about the difficulty to detect faulty connections to the external system (EIS) that the Resource Adapter is interfacing with. A common cause of connection failures is simply connection loss. In my previous blog I detailed the problems of detecting connection loss in the RA and presented a number of ways to address this problem.

What I didn't discuss is how you can build unit tests so that you can easily test connection failures and connection failure recovery. That is the topic of this blog.

What is it that I will be looking at? Say that you have a resource adapter that connects to an external system (EIS) using one or more TCP/IP connections. The failure I want to test is that of a simple connection drop. This can happen if the EIS is rebooted, crashes, or simply because the network temporarily fails (e.g. someone stumbles over a network cable).

An automated way to induce failures

How would you manually test this type of failure? You could simply kill the EIS, or unplug the network connection. Simple and effective, but very manual. What I prefer is an automated way so that I can include test failures in an automated unit test suite.  Here's a way to do that: use a port-forwarder proxy. A port forwarder proxy is a process that listens on a particular port. Any time a connection comes in on that port, it will create a new connection to a specified server and port. Bytes coming in from the client are sent to the server unmodified. Likewise, bytes coming in from the server are sent to the client unmodified. The client is the RA in this case; the server is the EIS. Hence, the port -forwarder proxy sits between the Resource Adapter and the EIS.

A failure can be induced by instructing the port-forwarder proxy to drop all the connections. To simulate a transient failure, the port-forwarder proxy should then again accept connections and forward them to the EIS.

Here is an example of how this port-forwarder proxy can be used in a JUnit test. Let's say that I'm testing a JMS resource adapter. Let's say that I'm testing the RA in an application server: I have an MDB deployed in the application server that reads JMS messages from one queue (e.g. Queue1) and forwards them to a different queue (e.g. Queue2). The test would look something like this:

  1. start the port-forwarder proxy; specify the server name and port number that the EIS is listening on
  2. get the port number that proxy is listening on
  3. generate a connection URL that the RA will use to connect to the EIS; the new connection URL will have the proxy's server name and port number rather than the server name and port number of the EIS
  4. update the EAR file with the new connection URL
  5. deploy the EAR file
  6. send 1000 messages to Queue1
  7. read these 1000 message from Queue2
  8. verify that the port-forwarder has received connections; then tell the port-forwarder to kill all active connections
  9. send another batch of 1000 messages to Queue1
  10. read this batch of 1000 messages from Queue2
  11. undeploy the EAR file

As you can see I'm assuming in this example that you are using an embedded resource adapter, i.e. one that is embedded in the EAR file. Ofcourse you can also make this work using global resource adapters; you just need a way to automatically configure the URL in the global resource adapter.

A port forwarder in Java

My first Java program I ever wrote was a port-forwarder proxy (Interactive Spy). I'm not sure when it was, but I do remember that people just started to use JDK 1.3. In that version of the JDK there was only blocking IO available. Based on that restriction a way to write a port-forwarder proxy is to listen on a port and for each incoming connection create two threads: one thread exclusively reads from the client and sends the bytes to the server; the other thread reads from the server and sends the bytes to the client. This was not a very scalable or elegant solution. However, I did use this solution successfully for a number of tests in the JMS test suite at SeeBeyond for the JMS Intelligent Queue Manager in Java CAPS (better known to engineers as STCMS). However, when I was developing the connection failure tests for the JMSJCA Resource Adapter for JMS, I ran into these scalability issues, and I saw test failures due to problems in the test setup rather than to bugs in the application server, JMSJCA or STCMS.

A better approach is to make use of the non-blocking NIO capabilities in JDK 1.4. For me this was the opportunity to explore the capabilities of NIO. The result is a relatively small class that fully stands on its own; I pasted the source code of the resulting class below. The usual restrictions apply: this code is provided "as-is" without warranty of any kind; use at your own risk, etc. I've omitted the unit test for this class.

This is how you use it: instantiate a TCPProxyNIO passing it the server name and port number of the EIS. The proxy will find a port to listen on in the range of 50000. Use getPort() to find out what the port number is. The proxy now listens on that port and is ready to accept connections. Use killAllConnections() to kill all connections. Make sure to destroy the proxy after use: call close().


/\*
\* The contents of this file are subject to the terms
\* of the Common Development and Distribution License
\* (the "License"). You may not use this file except
\* in compliance with the License.
\*
\* You can obtain a copy of the license at
\* glassfish/bootstrap/legal/CDDLv1.0.txt or
\* https://glassfish.dev.java.net/public/CDDLv1.0.html.
\* See the License for the specific language governing
\* permissions and limitations under the License.
\*
\* When distributing Covered Code, include this CDDL
\* HEADER in each file and include the License file at
\* glassfish/bootstrap/legal/CDDLv1.0.txt. If applicable,
\* add the following below this CDDL HEADER, with the
\* fields enclosed by brackets "[]" replaced with your
\* own identifying information: Portions Copyright [yyyy]
\* [name of copyright owner]
\*/

package com.stc.jmsjca.test.core;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;

/\*\*
\* A proxy server that can be used in JUnit tests to induce connection
\* failures, to assure that connections are made, etc. The proxy server is
\* setup with a target server and port; it will listen on a port that it
\* chooses itself and delegates all data coming in to the server, and vice
\* versa.
\*
\* Implementation: each incoming connection (client connection) maps into
\* a Conduit; this holds both ends of the line, i.e. the client end
\* and the server end.
\*
\* Everything is based on non-blocking IO (NIO). The proxy creates one
\* extra thread to handle the NIO events.
\*
\* @author fkieviet
\*/
public class TcpProxyNIO implements Runnable {
private static Logger sLog = Logger.getLogger(TcpProxyNIO.class.getName());
private String mRelayServer;
private int mRelayPort;
private int mNPassThroughsCreated;
private Receptor mReceptor;
private Map mChannelToPipes = new IdentityHashMap();
private Selector selector;
private int mCmd;
private Semaphore mAck = new Semaphore(0);
private Object mCmdSync = new Object();
private Exception mStartupFailure;
private Exception mUnexpectedThreadFailure;
private boolean mStopped;

private static final int NONE = 0;
private static final int STOP = 1;
private static final int KILLALL = 2;
private static final int KILLLAST = 3;

private static int BUFFER_SIZE = 16384;

/\*\*
\* Constructor
\*
\* @param relayServer
\* @param port
\* @throws Exception
\*/
public TcpProxyNIO(String relayServer, int port) throws Exception {
mRelayServer = relayServer;
mRelayPort = port;

Receptor r = selectPort();
mReceptor = r;

new Thread(this, "TCPProxy on " + mReceptor.port).start();

mAck.acquire();
if (mStartupFailure != null) {
throw mStartupFailure;
}
}

/\*\*
\* Utility class to hold data that describes the proxy server
\* listening socket
\*/
private class Receptor {
public int port;
public ServerSocket serverSocket;
public ServerSocketChannel serverSocketChannel;

public Receptor(int port) {
this.port = port;
}

public void bind() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocket = serverSocketChannel.socket();
InetSocketAddress inetSocketAddress = new InetSocketAddress(port);
serverSocket.bind(inetSocketAddress);
}

public void close() {
if (serverSocketChannel != null) {
try {
serverSocketChannel.close();
} catch (Exception ignore) {

}
serverSocket = null;
}
}
}

/\*\*
\* The client or server connection
\*/
private class PipeEnd {
public SocketChannel channel;
public ByteBuffer buf;
public Conduit conduit;
public PipeEnd other;
public SelectionKey key;
public String name;

public PipeEnd(String name) {
buf = ByteBuffer.allocateDirect(BUFFER_SIZE);
buf.clear();
buf.flip();
this.name = "{" + name + "}";
}

public String toString() {
StringBuffer ret = new StringBuffer();
ret.append(name);
if (key != null) {
ret.append("; key: ");
if ((key.interestOps() & SelectionKey.OP_READ) != 0) {
ret.append("-READ-");
}
if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
ret.append("-WRITE-");
}
if ((key.interestOps() & SelectionKey.OP_CONNECT) != 0) {
ret.append("-CONNECT-");
}
}
return ret.toString();
}

public void setChannel(SocketChannel channel2) throws IOException {
this.channel = channel2;
mChannelToPipes.put(channel, this);
channel.configureBlocking(false);
}

public void close() throws IOException {
mChannelToPipes.remove(channel);
try {
channel.close();
} catch (IOException e) {
// ignore

}
channel = null;
if (key != null) {
key.cancel();
key = null;
}
}

public void listenForRead(boolean on) {
if (on) {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_READ);
}
}

public void listenForWrite(boolean on) {
if (on) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_WRITE);
}
}
}

/\*\*
\* Represents one link from the client to the server. It is an association
\* of the two ends of the link.
\*/
private class Conduit {
public PipeEnd client;
public PipeEnd server;
public int id;

public Conduit() {
client = new PipeEnd("CLIENT");
client.conduit = this;

server = new PipeEnd("SERVER");
server.conduit = this;

client.other = server;
server.other = client;

id = mNPassThroughsCreated++;
}
}

/\*\*
\* Finds a port to listen on
\*
\* @return a newly initialized receptor
\* @throws Exception on any failure
\*/
private Receptor selectPort() throws Exception {
Receptor ret;

// Find a port to listen on; try up to 100 port numbers

Random random = new Random();
for (int i = 0; i < 100; i++) {
int port = 50000 + random.nextInt(1000);
try {
ret = new Receptor(port);
ret.bind();
return ret;
} catch (IOException ignore) {
// Ignore

}
}
throw new Exception("Could not bind port");
}

/\*\*
\* The main event loop
\*/
public void run() {
// ===== STARTUP ==========
// The main thread will wait until the server is actually listening and ready
// to process incoming connections. Failures during startup should be
// propagated back to the calling thread.

try {
selector = Selector.open();

// Acceptor
mReceptor.serverSocketChannel.configureBlocking(false);
mReceptor.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
synchronized (mCmdSync) {
mStartupFailure = e;
}
}

// ===== STARTUP COMPLETE ==========

// Tha main thread is waiting on the ack lock; notify the main thread.
// Startup errors are communicated through the mStartupFailure variable.
mAck.release();
if (mStartupFailure != null) {
return;
}

// ===== RUN: event loop ==========

// The proxy thread spends its life in this event handling loop in which
// it deals with requests from the main thread and from notifications from
// NIO.
try {
loop: for (;;) {
int nEvents = selector.select();

// ===== COMMANDS ==========

// Process requests from the main thread. The communication mechanism
// is simple: the command is communicated through a variable; the main
// thread waits until the mAck lock is set.
switch (getCmd()) {
case STOP: {
ack();
break loop;
}
case KILLALL: {
PipeEnd[] pipes = toPipeArray();
for (int i = 0; i < pipes.length; i++) {
pipes[i].close();
}
ack();
continue;
}
case KILLLAST: {
PipeEnd[] pipes = toPipeArray();
Conduit last = pipes.length > 0 ? pipes[0].conduit : null;
if (last != null) {
for (int i = 0; i < pipes.length; i++) {
if (pipes[i].conduit.id > last.id) {
last = pipes[i].conduit;
}
}
last.client.close();
last.server.close();
}
ack();
continue;
}
}

//===== NIO Event handling ==========

if (nEvents == 0) {
continue;
}
Set keySet = selector.selectedKeys();
for (Iterator iter = keySet.iterator(); iter.hasNext();) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();

//===== ACCEPT ==========

// A client connection has come in. Perform an async connect to
// the server. The remainder of the connect is going to be done in
// the CONNECT event handling.
if (key.isValid() && key.isAcceptable()) {
sLog.fine(">Incoming connection");
try {
Conduit pt = new Conduit();
ServerSocketChannel ss = (ServerSocketChannel) key.channel();

// Accept

pt.client.setChannel(ss.accept());

// Do asynchronous connect to relay server
pt.server.setChannel(SocketChannel.open());
pt.server.key = pt.server.channel.register(
selector, SelectionKey.OP_CONNECT);
pt.server.channel.connect(new InetSocketAddress(
mRelayServer, mRelayPort));
} catch (IOException e) {
System.err.println(">Unable to accept channel");
e.printStackTrace();
// selectionKey.cancel();

}
}

//===== CONNECT ==========
// Event that is generated when the connection to the server has
// completed. Here we need to initialize both pipe-ends. Both ends
// need to start reading. If the connection had not succeeded, the
// client needs to be closed immediately.
if (key != null && key.isValid() && key.isConnectable()) {
SocketChannel c = (SocketChannel) key.channel();
PipeEnd p = (PipeEnd) mChannelToPipes.get(c); // SERVER-SIDE

if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">CONNECT event on " + p + " -- other: " + p.other);
}

boolean success;
try {
success = c.finishConnect();
} catch (RuntimeException e) {
success = false;
if (sLog.isLoggable(Level.FINE)) {
sLog.log(Level.FINE, "Connect failed: " + e, e);
}
}
if (!success) {
// Connection failure

p.close();
p.other.close();

// Unregister the channel with this selector
key.cancel();
key = null;
} else {
// Connection was established successfully

// Both need to be in readmode; note that the key for
// "other" has not been created yet
p.other.key = p.other.channel.register(selector, SelectionKey.OP_READ);
p.key.interestOps(SelectionKey.OP_READ);
}

if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END CONNECT event on " + p + " -- other: " + p.other);
}
}

//===== READ ==========

// Data was received. The data needs to be written to the other
// end. Note that data from client to server is processed one chunk
// at a time, i.e. a chunk of data is read from the client; then
// no new data is read from the client until the complete chunk
// is written to to the server. This is why the interest-fields
// in the key are toggled back and forth. Ofcourse the same holds

// true for data from the server to the client.
if (key != null && key.isValid() && key.isReadable()) {
PipeEnd p = (PipeEnd) mChannelToPipes.get(key.channel());
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">READ event on " + p + " -- other: " + p.other);
}

// Read data

p.buf.clear();
int n;
try {
n = p.channel.read(p.buf);
} catch (IOException e) {
n = -1;
}

if (n >= 0) {
// Write to other end

p.buf.flip();
int nw = p.other.channel.write(p.buf);

if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">Read " + n + " from " + p.name + "; wrote " + nw);
}

p.other.listenForWrite(true);
p.listenForRead(false);
} else {
// Disconnected

if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Disconnected");
}

p.close();
key = null;
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Now present: " + mChannelToPipes.size());
}

p.other.close();

// Stop reading from other side

if (p.other.channel != null) {
p.other.listenForRead(false);
p.other.listenForWrite(true);
}
}

if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END READ event on " + p + " -- other: " + p.other);
}
}

//===== WRITE ==========

// Data was sent. As for READ, data is processed in chunks which
// is why the interest READ and WRITE bits are flipped.
// In the case a connection failure is detected, there still may be
// data in the READ buffer that was not read yet. Example, the
// client sends a LOGOFF message to the server, the server then sends
// back a BYE message back to the client; depending on when the

// write failure event comes in, the BYE message may still be in
// the buffer and must be read and sent to the client before the
// client connection is closed.
if (key != null && key.isValid() && key.isWritable()) {
PipeEnd p = (PipeEnd) mChannelToPipes.get(key.channel());

if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">WRITE event on " + p + " -- other: " + p.other);
}

// More to write?

if (p.other.buf.hasRemaining()) {
int n = p.channel.write(p.other.buf);
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">Write some more to " + p.name + ": " + n);
}
} else {
if (p.other.channel != null) {
// Read from input again

p.other.buf.clear();
p.other.buf.flip();

p.other.listenForRead(true);
p.listenForWrite(false);

} else {
// Close

p.close();
key = null;
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Now present: " + mChannelToPipes.size());
}
}
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END WRITE event on " + p + " -- other: " + p.other);
}
}
}
}
} catch (Exception e) {
sLog.log(Level.SEVERE, "Proxy main loop error: " + e, e);
e.printStackTrace();
synchronized (mCmdSync) {
mUnexpectedThreadFailure = e;
}
}

// ===== CLEANUP =====

// The main event loop has exited; close all connections
try {
selector.close();
PipeEnd[] pipes = toPipeArray();
for (int i = 0; i < pipes.length; i++) {
pipes[i].close();
}
mReceptor.close();
} catch (IOException e) {
sLog.log(Level.SEVERE, "Cleanup error: " + e, e);
e.printStackTrace();
}
}

private PipeEnd[] toPipeArray() {
return (PipeEnd[]) mChannelToPipes.values().toArray(
new PipeEnd[mChannelToPipes.size()]);
}

private int getCmd() {
synchronized (mCmdSync) {
return mCmd;
}
}

private void ack() {
setCmd(NONE);
mAck.release();
}

private void setCmd(int cmd) {
synchronized (mCmdSync) {
mCmd = cmd;
}
}

private void request(int cmd) {
setCmd(cmd);
selector.wakeup();

try {
mAck.acquire();
} catch (InterruptedException e) {
// ignore
}
}

/\*\*
\* Closes the proxy
\*/
public void close() {
if (mStopped) {
return;
}
mStopped = true;

synchronized (mCmdSync) {
if (mUnexpectedThreadFailure != null) {
throw new RuntimeException("Unexpected thread exit: " + mUnexpectedThreadFailure, mUnexpectedThreadFailure);
}
}

request(STOP);
}

/\*\*
\* Restarts after close
\*
\* @throws Exception
\*/
public void restart() throws Exception {
close();

mChannelToPipes = new IdentityHashMap();
mAck = new Semaphore(0);
mStartupFailure = null;
mUnexpectedThreadFailure = null;

mReceptor.bind();
new Thread(this, "TCPProxy on " + mReceptor.port).start();
mStopped = false;

mAck.acquire();
if (mStartupFailure != null) {
throw mStartupFailure;
}
}

/\*\*
\* Returns the port number this proxy listens on
\*
\* @return port number
\*/
public int getPort() {
return mReceptor.port;
}

/\*\*
\* Kills all connections; data may be lost
\*/
public void killAllConnections() {
request(KILLALL);
}

/\*\*

\* Kills the last created connection; data may be lost
\*/
public void killLastConnection() {
request(KILLLAST);
}

/\*\*
\* Closes the proxy
\*
\* @param proxy
\*/
public void safeClose(TcpProxyNIO proxy) {
if (proxy != null) {
proxy.close();
}
}
}


Other uses of the port-forwarding proxy

What else can you do with this port-forwarding proxy? First of all, its use is not limited to outbound connections: ofcourse you can also use it to test connection failures on inbound connections.  Next, you can also use it to make sure that connections are infact being made the way that you expected. For example, the CAPS JMS server can use both SSL and non SSL connections. I added a few tests to our internal test suite to test this capability from JMSJCA. Here, just to make sure that the test itself works as expected, I'm using the proxy to find out if the URL was indeed modified and that the connections are indeed going to the JMS server's SSL port. Similary, you can also use the proxy to count the number connections being made. E.g. if you want to test that connection pooling indeed works, you can use this proxy and assert that the number of connections created does not exceed the number of connections in the pool.

Missing components

In the example I mentioned updating the EAR file and automatically deploying the EAR file to the application server. I've created tools for those too so that you can do those things programmatically as well. Drop me a line if you're interested and I'll blog about those too.

1 comment:

Zafar said...

My question is not on testing connections on RAs but on re-establishing them. I want to dynamically change the outbound IP addresses on connections from JCAPS HL7 eWay. The intent is to get the ip addresses from a lookup service and dynamically re-assign a thread from the pool to make a new connection and send the message out. Otherwise I will have to create thousands of outbound eWays to achieve the same result. Any help is greatly appreciated. Thanks Z.