In a previous blog (see J2EE
JCA Resource Adapters: Poisonous pools) I talked about the
difficulty to detect faulty connections to the external system (EIS)
that the Resource Adapter is interfacing with. A common cause of
connection failures is simply connection loss. In my previous blog I
detailed the problems of detecting connection loss in the RA and
presented a number of ways to address this problem.
What I didn't discuss is how you can build unit tests so that you
can easily test connection failures and connection failure recovery.
That is the topic of this blog.
What is it that I will be looking at? Say that you have a resource
adapter that connects to an external system (EIS) using one or more
TCP/IP connections. The failure I want to test is that of a simple
connection drop. This can happen if the EIS is rebooted, crashes, or
simply because the network temporarily fails (e.g. someone stumbles
over a network cable).
An automated way to induce failures
How would you manually test this type of failure? You could simply
kill the EIS, or unplug the network connection. Simple and effective,
but very manual. What I prefer is an automated way so that I
can include test failures in an automated unit test suite. Here's
a way to do that:
use a port-forwarder proxy. A port forwarder proxy is a process that
listens on a particular port. Any time a connection comes in on that
port, it will create a new connection to a specified server and port.
Bytes coming in from the client are sent to the server unmodified.
Likewise, bytes coming in from the server are sent to the client
unmodified. The
client is the RA in this case; the server is the EIS. Hence, the port
-forwarder proxy sits between the Resource Adapter and the EIS.
A failure can be induced by instructing the port-forwarder proxy to
drop all the connections. To simulate a transient failure, the
port-forwarder proxy should then again accept connections and forward
them to the EIS.
Here is an example of how this port-forwarder proxy can be used in a
JUnit test. Let's say that I'm testing a JMS resource adapter. Let's
say that I'm testing the RA in an application server: I have an MDB
deployed in the application server that reads JMS messages from one
queue (e.g. Queue1) and forwards them to a different queue (e.g.
Queue2). The test would look something like this:
- start the port-forwarder proxy; specify the server name and port
number that the EIS is listening on
- get the port number that proxy is listening on
- generate a connection URL that the RA will use to connect to the
EIS; the new connection URL will have the proxy's server name and port
number rather than the server name and port number of the EIS
- update the EAR file with the new connection URL
- deploy the EAR file
- send 1000 messages to Queue1
- read these 1000 message from Queue2
- verify that the port-forwarder has received connections; then
tell the port-forwarder to kill all active connections
- send another batch of 1000 messages to Queue1
- read this batch of 1000 messages from Queue2
- undeploy the EAR file
As you can see I'm assuming in this example that you are using an
embedded resource adapter, i.e. one that is embedded in the EAR file.
Ofcourse you can also make this work using global resource adapters;
you just need a way to automatically configure the URL in the global
resource adapter.
A port forwarder in Java
My first Java program I ever wrote was a port-forwarder proxy
(Interactive Spy). I'm not sure when it was, but I do remember that
people just started to use JDK 1.3. In that version of the JDK there
was only blocking IO available. Based on that restriction a way to
write a port-forwarder proxy is to listen on a port and for each
incoming connection create two threads: one thread exclusively reads
from the client and sends the bytes to the server; the other thread
reads from the server and sends the bytes to the client. This was not a
very scalable or elegant solution. However, I did use this solution
successfully for a number of tests in the JMS test suite at SeeBeyond
for the JMS Intelligent Queue Manager in Java CAPS (better known to
engineers as STCMS). However, when I was developing the connection
failure tests for the JMSJCA Resource Adapter for JMS, I ran into these
scalability issues, and I saw test failures due to problems in the test
setup rather than to bugs in the application server, JMSJCA or STCMS.
A better approach is to make use of the non-blocking NIO
capabilities in JDK 1.4. For me this was the opportunity to explore the
capabilities of NIO. The result is a relatively small class that fully
stands on its own; I pasted the source code of the resulting class
below. The usual restrictions apply: this code is provided "as-is"
without warranty of any kind; use at your own risk, etc. I've omitted
the unit test for this class.
This is how you use it: instantiate a TCPProxyNIO passing it the
server name and port number of the EIS. The proxy will find a port to
listen on in the range of 50000. Use getPort() to find out what the
port number is. The proxy now listens on that port and is ready to
accept connections. Use killAllConnections()
to kill all connections. Make sure to destroy the proxy after use: call
close().
package com.stc.jmsjca.test.core;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TcpProxyNIO implements Runnable {
private static Logger sLog = Logger.getLogger(TcpProxyNIO.class.getName());
private String mRelayServer;
private int mRelayPort;
private int mNPassThroughsCreated;
private Receptor mReceptor;
private Map mChannelToPipes = new IdentityHashMap();
private Selector selector;
private int mCmd;
private Semaphore mAck = new Semaphore(0);
private Object mCmdSync = new Object();
private Exception mStartupFailure;
private Exception mUnexpectedThreadFailure;
private boolean mStopped;
private static final int NONE = 0;
private static final int STOP = 1;
private static final int KILLALL = 2;
private static final int KILLLAST = 3;
private static int BUFFER_SIZE = 16384;
public TcpProxyNIO(String relayServer, int port) throws Exception {
mRelayServer = relayServer;
mRelayPort = port;
Receptor r = selectPort();
mReceptor = r;
new Thread(this, "TCPProxy on " + mReceptor.port).start();
mAck.acquire();
if (mStartupFailure != null) {
throw mStartupFailure;
}
}
private class Receptor {
public int port;
public ServerSocket serverSocket;
public ServerSocketChannel serverSocketChannel;
public Receptor(int port) {
this.port = port;
}
public void bind() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocket = serverSocketChannel.socket();
InetSocketAddress inetSocketAddress = new InetSocketAddress(port);
serverSocket.bind(inetSocketAddress);
}
public void close() {
if (serverSocketChannel != null) {
try {
serverSocketChannel.close();
} catch (Exception ignore) {
}
serverSocket = null;
}
}
}
private class PipeEnd {
public SocketChannel channel;
public ByteBuffer buf;
public Conduit conduit;
public PipeEnd other;
public SelectionKey key;
public String name;
public PipeEnd(String name) {
buf = ByteBuffer.allocateDirect(BUFFER_SIZE);
buf.clear();
buf.flip();
this.name = "{" + name + "}";
}
public String toString() {
StringBuffer ret = new StringBuffer();
ret.append(name);
if (key != null) {
ret.append("; key: ");
if ((key.interestOps() & SelectionKey.OP_READ) != 0) {
ret.append("-READ-");
}
if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
ret.append("-WRITE-");
}
if ((key.interestOps() & SelectionKey.OP_CONNECT) != 0) {
ret.append("-CONNECT-");
}
}
return ret.toString();
}
public void setChannel(SocketChannel channel2) throws IOException {
this.channel = channel2;
mChannelToPipes.put(channel, this);
channel.configureBlocking(false);
}
public void close() throws IOException {
mChannelToPipes.remove(channel);
try {
channel.close();
} catch (IOException e) {
}
channel = null;
if (key != null) {
key.cancel();
key = null;
}
}
public void listenForRead(boolean on) {
if (on) {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_READ);
}
}
public void listenForWrite(boolean on) {
if (on) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_WRITE);
}
}
}
private class Conduit {
public PipeEnd client;
public PipeEnd server;
public int id;
public Conduit() {
client = new PipeEnd("CLIENT");
client.conduit = this;
server = new PipeEnd("SERVER");
server.conduit = this;
client.other = server;
server.other = client;
id = mNPassThroughsCreated++;
}
}
private Receptor selectPort() throws Exception {
Receptor ret;
Random random = new Random();
for (int i = 0; i < 100; i++) {
int port = 50000 + random.nextInt(1000);
try {
ret = new Receptor(port);
ret.bind();
return ret;
} catch (IOException ignore) {
}
}
throw new Exception("Could not bind port");
}
public void run() {
try {
selector = Selector.open();
mReceptor.serverSocketChannel.configureBlocking(false);
mReceptor.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
synchronized (mCmdSync) {
mStartupFailure = e;
}
}
mAck.release();
if (mStartupFailure != null) {
return;
}
try {
loop: for (;;) {
int nEvents = selector.select();
switch (getCmd()) {
case STOP: {
ack();
break loop;
}
case KILLALL: {
PipeEnd[] pipes = toPipeArray();
for (int i = 0; i < pipes.length; i++) {
pipes[i].close();
}
ack();
continue;
}
case KILLLAST: {
PipeEnd[] pipes = toPipeArray();
Conduit last = pipes.length > 0 ? pipes[0].conduit : null;
if (last != null) {
for (int i = 0; i < pipes.length; i++) {
if (pipes[i].conduit.id > last.id) {
last = pipes[i].conduit;
}
}
last.client.close();
last.server.close();
}
ack();
continue;
}
}
if (nEvents == 0) {
continue;
}
Set keySet = selector.selectedKeys();
for (Iterator iter = keySet.iterator(); iter.hasNext();) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isValid() && key.isAcceptable()) {
sLog.fine(">Incoming connection");
try {
Conduit pt = new Conduit();
ServerSocketChannel ss = (ServerSocketChannel) key.channel();
pt.client.setChannel(ss.accept());
pt.server.setChannel(SocketChannel.open());
pt.server.key = pt.server.channel.register(
selector, SelectionKey.OP_CONNECT);
pt.server.channel.connect(new InetSocketAddress(
mRelayServer, mRelayPort));
} catch (IOException e) {
System.err.println(">Unable to accept channel");
e.printStackTrace();
}
}
if (key != null && key.isValid() && key.isConnectable()) {
SocketChannel c = (SocketChannel) key.channel();
PipeEnd p = (PipeEnd) mChannelToPipes.get(c);
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">CONNECT event on " + p + " -- other: " + p.other);
}
boolean success;
try {
success = c.finishConnect();
} catch (RuntimeException e) {
success = false;
if (sLog.isLoggable(Level.FINE)) {
sLog.log(Level.FINE, "Connect failed: " + e, e);
}
}
if (!success) {
p.close();
p.other.close();
key.cancel();
key = null;
} else {
p.other.key = p.other.channel.register(selector, SelectionKey.OP_READ);
p.key.interestOps(SelectionKey.OP_READ);
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END CONNECT event on " + p + " -- other: " + p.other);
}
}
if (key != null && key.isValid() && key.isReadable()) {
PipeEnd p = (PipeEnd) mChannelToPipes.get(key.channel());
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">READ event on " + p + " -- other: " + p.other);
}
p.buf.clear();
int n;
try {
n = p.channel.read(p.buf);
} catch (IOException e) {
n = -1;
}
if (n >= 0) {
p.buf.flip();
int nw = p.other.channel.write(p.buf);
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">Read " + n + " from " + p.name + "; wrote " + nw);
}
p.other.listenForWrite(true);
p.listenForRead(false);
} else {
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Disconnected");
}
p.close();
key = null;
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Now present: " + mChannelToPipes.size());
}
p.other.close();
if (p.other.channel != null) {
p.other.listenForRead(false);
p.other.listenForWrite(true);
}
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END READ event on " + p + " -- other: " + p.other);
}
}
if (key != null && key.isValid() && key.isWritable()) {
PipeEnd p = (PipeEnd) mChannelToPipes.get(key.channel());
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">WRITE event on " + p + " -- other: " + p.other);
}
if (p.other.buf.hasRemaining()) {
int n = p.channel.write(p.other.buf);
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">Write some more to " + p.name + ": " + n);
}
} else {
if (p.other.channel != null) {
p.other.buf.clear();
p.other.buf.flip();
p.other.listenForRead(true);
p.listenForWrite(false);
} else {
p.close();
key = null;
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Now present: " + mChannelToPipes.size());
}
}
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END WRITE event on " + p + " -- other: " + p.other);
}
}
}
}
} catch (Exception e) {
sLog.log(Level.SEVERE, "Proxy main loop error: " + e, e);
e.printStackTrace();
synchronized (mCmdSync) {
mUnexpectedThreadFailure = e;
}
}
try {
selector.close();
PipeEnd[] pipes = toPipeArray();
for (int i = 0; i < pipes.length; i++) {
pipes[i].close();
}
mReceptor.close();
} catch (IOException e) {
sLog.log(Level.SEVERE, "Cleanup error: " + e, e);
e.printStackTrace();
}
}
private PipeEnd[] toPipeArray() {
return (PipeEnd[]) mChannelToPipes.values().toArray(
new PipeEnd[mChannelToPipes.size()]);
}
private int getCmd() {
synchronized (mCmdSync) {
return mCmd;
}
}
private void ack() {
setCmd(NONE);
mAck.release();
}
private void setCmd(int cmd) {
synchronized (mCmdSync) {
mCmd = cmd;
}
}
private void request(int cmd) {
setCmd(cmd);
selector.wakeup();
try {
mAck.acquire();
} catch (InterruptedException e) {
}
}
public void close() {
if (mStopped) {
return;
}
mStopped = true;
synchronized (mCmdSync) {
if (mUnexpectedThreadFailure != null) {
throw new RuntimeException("Unexpected thread exit: " +
mUnexpectedThreadFailure, mUnexpectedThreadFailure);
}
}
request(STOP);
}
public void restart() throws Exception {
close();
mChannelToPipes = new IdentityHashMap();
mAck = new Semaphore(0);
mStartupFailure = null;
mUnexpectedThreadFailure = null;
mReceptor.bind();
new Thread(this, "TCPProxy on " + mReceptor.port).start();
mStopped = false;
mAck.acquire();
if (mStartupFailure != null) {
throw mStartupFailure;
}
}
public int getPort() {
return mReceptor.port;
}
public void killAllConnections() {
request(KILLALL);
}
public void killLastConnection() {
request(KILLLAST);
}
public void safeClose(TcpProxyNIO proxy) {
if (proxy != null) {
proxy.close();
}
}
}
Other uses of the port-forwarding proxy
What else can you do with this port-forwarding proxy? First of all,
its use is not limited to outbound connections: ofcourse you can also
use it to test connection failures on inbound connections. Next,
you can also use it to make sure that connections are infact being made
the way that you expected. For example, the CAPS JMS server can use
both SSL and non SSL connections. I added a few tests to our internal
test suite to test this capability from JMSJCA. Here, just to make sure
that the test itself works as expected, I'm using the proxy to find out
if the URL was indeed modified and that the connections are indeed
going to the JMS server's SSL port. Similary, you can also use the
proxy to count the number connections being made. E.g. if you want to
test that connection pooling indeed works, you can use this proxy and
assert that the number of connections created does not exceed the
number of connections in the pool.
Missing components
In the example I mentioned updating the EAR file and automatically
deploying the EAR file to the application server. I've created tools
for those too so that you can do those things programmatically as well.
Drop me a line if you're interested and I'll blog about those too.