In a previous blog (see J2EE JCA Resource Adapters: Poisonous pools) I talked about the difficulty to detect faulty connections to the external system (EIS) that the Resource Adapter is interfacing with. A common cause of connection failures is simply connection loss. In my previous blog I detailed the problems of detecting connection loss in the RA and presented a number of ways to address this problem.
What I didn't discuss is how you can build unit tests so that you
can easily test connection failures and connection failure recovery.
That is the topic of this blog.
What is it that I will be looking at? Say that you have a resource
adapter that connects to an external system (EIS) using one or more
TCP/IP connections. The failure I want to test is that of a simple
connection drop. This can happen if the EIS is rebooted, crashes, or
simply because the network temporarily fails (e.g. someone stumbles
over a network cable).
An automated way to induce failures
How would you manually test this type of failure? You could simply
kill the EIS, or unplug the network connection. Simple and effective,
but very manual. What I prefer is an automated way so that I
can include test failures in an automated unit test suite. Here's
a way to do that:
use a port-forwarder proxy. A port forwarder proxy is a process that
listens on a particular port. Any time a connection comes in on that
port, it will create a new connection to a specified server and port.
Bytes coming in from the client are sent to the server unmodified.
Likewise, bytes coming in from the server are sent to the client
unmodified. The
client is the RA in this case; the server is the EIS. Hence, the port
-forwarder proxy sits between the Resource Adapter and the EIS.
A failure can be induced by instructing the port-forwarder proxy to
drop all the connections. To simulate a transient failure, the
port-forwarder proxy should then again accept connections and forward
them to the EIS.
Here is an example of how this port-forwarder proxy can be used in a
JUnit test. Let's say that I'm testing a JMS resource adapter. Let's
say that I'm testing the RA in an application server: I have an MDB
deployed in the application server that reads JMS messages from one
queue (e.g. Queue1) and forwards them to a different queue (e.g.
Queue2). The test would look something like this:
- start the port-forwarder proxy; specify the server name and port
number that the EIS is listening on
- get the port number that proxy is listening on
- generate a connection URL that the RA will use to connect to the EIS; the new connection URL will have the proxy's server name and port number rather than the server name and port number of the EIS
- update the EAR file with the new connection URL
- deploy the EAR file
- send 1000 messages to Queue1
- read these 1000 message from Queue2
- verify that the port-forwarder has received connections; then tell the port-forwarder to kill all active connections
- send another batch of 1000 messages to Queue1
- read this batch of 1000 messages from Queue2
- undeploy the EAR file
As you can see I'm assuming in this example that you are using an
embedded resource adapter, i.e. one that is embedded in the EAR file.
Ofcourse you can also make this work using global resource adapters;
you just need a way to automatically configure the URL in the global
resource adapter.
A port forwarder in Java
My first Java program I ever wrote was a port-forwarder proxy
(Interactive Spy). I'm not sure when it was, but I do remember that
people just started to use JDK 1.3. In that version of the JDK there
was only blocking IO available. Based on that restriction a way to
write a port-forwarder proxy is to listen on a port and for each
incoming connection create two threads: one thread exclusively reads
from the client and sends the bytes to the server; the other thread
reads from the server and sends the bytes to the client. This was not a
very scalable or elegant solution. However, I did use this solution
successfully for a number of tests in the JMS test suite at SeeBeyond
for the JMS Intelligent Queue Manager in Java CAPS (better known to
engineers as STCMS). However, when I was developing the connection
failure tests for the JMSJCA Resource Adapter for JMS, I ran into these
scalability issues, and I saw test failures due to problems in the test
setup rather than to bugs in the application server, JMSJCA or STCMS.
A better approach is to make use of the non-blocking NIO
capabilities in JDK 1.4. For me this was the opportunity to explore the
capabilities of NIO. The result is a relatively small class that fully
stands on its own; I pasted the source code of the resulting class
below. The usual restrictions apply: this code is provided "as-is"
without warranty of any kind; use at your own risk, etc. I've omitted
the unit test for this class.
This is how you use it: instantiate a TCPProxyNIO passing it the
server name and port number of the EIS. The proxy will find a port to
listen on in the range of 50000. Use getPort() to find out what the
port number is. The proxy now listens on that port and is ready to
accept connections. Use killAllConnections()
to kill all connections. Make sure to destroy the proxy after use: call
close().
/\*
\* The contents of this file are subject to the terms
\* of the Common Development and Distribution License
\* (the "License"). You may not use this file except
\* in compliance with the License.
\*
\* You can obtain a copy of the license at
\* glassfish/bootstrap/legal/CDDLv1.0.txt or
\* https://glassfish.dev.java.net/public/CDDLv1.0.html.
\* See the License for the specific language governing
\* permissions and limitations under the License.
\*
\* When distributing Covered Code, include this CDDL
\* HEADER in each file and include the License file at
\* glassfish/bootstrap/legal/CDDLv1.0.txt. If applicable,
\* add the following below this CDDL HEADER, with the
\* fields enclosed by brackets "[]" replaced with your
\* own identifying information: Portions Copyright [yyyy]
\* [name of copyright owner]
\*/
package com.stc.jmsjca.test.core;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
/\*\*
\* A proxy server that can be used in JUnit tests to induce connection
\* failures, to assure that connections are made, etc. The proxy server is
\* setup with a target server and port; it will listen on a port that it
\* chooses itself and delegates all data coming in to the server, and vice
\* versa.
\*
\* Implementation: each incoming connection (client connection) maps into
\* a Conduit; this holds both ends of the line, i.e. the client end
\* and the server end.
\*
\* Everything is based on non-blocking IO (NIO). The proxy creates one
\* extra thread to handle the NIO events.
\*
\* @author fkieviet
\*/
public class TcpProxyNIO implements Runnable {
private static Logger sLog = Logger.getLogger(TcpProxyNIO.class.getName());
private String mRelayServer;
private int mRelayPort;
private int mNPassThroughsCreated;
private Receptor mReceptor;
private Map mChannelToPipes = new IdentityHashMap();
private Selector selector;
private int mCmd;
private Semaphore mAck = new Semaphore(0);
private Object mCmdSync = new Object();
private Exception mStartupFailure;
private Exception mUnexpectedThreadFailure;
private boolean mStopped;
private static final int NONE = 0;
private static final int STOP = 1;
private static final int KILLALL = 2;
private static final int KILLLAST = 3;
private static int BUFFER_SIZE = 16384;
/\*\*
\* Constructor
\*
\* @param relayServer
\* @param port
\* @throws Exception
\*/
public TcpProxyNIO(String relayServer, int port) throws Exception {
mRelayServer = relayServer;
mRelayPort = port;
Receptor r = selectPort();
mReceptor = r;
new Thread(this, "TCPProxy on " + mReceptor.port).start();
mAck.acquire();
if (mStartupFailure != null) {
throw mStartupFailure;
}
}
/\*\*
\* Utility class to hold data that describes the proxy server
\* listening socket
\*/
private class Receptor {
public int port;
public ServerSocket serverSocket;
public ServerSocketChannel serverSocketChannel;
public Receptor(int port) {
this.port = port;
}
public void bind() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocket = serverSocketChannel.socket();
InetSocketAddress inetSocketAddress = new InetSocketAddress(port);
serverSocket.bind(inetSocketAddress);
}
public void close() {
if (serverSocketChannel != null) {
try {
serverSocketChannel.close();
} catch (Exception ignore) {
}
serverSocket = null;
}
}
}
/\*\*
\* The client or server connection
\*/
private class PipeEnd {
public SocketChannel channel;
public ByteBuffer buf;
public Conduit conduit;
public PipeEnd other;
public SelectionKey key;
public String name;
public PipeEnd(String name) {
buf = ByteBuffer.allocateDirect(BUFFER_SIZE);
buf.clear();
buf.flip();
this.name = "{" + name + "}";
}
public String toString() {
StringBuffer ret = new StringBuffer();
ret.append(name);
if (key != null) {
ret.append("; key: ");
if ((key.interestOps() & SelectionKey.OP_READ) != 0) {
ret.append("-READ-");
}
if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
ret.append("-WRITE-");
}
if ((key.interestOps() & SelectionKey.OP_CONNECT) != 0) {
ret.append("-CONNECT-");
}
}
return ret.toString();
}
public void setChannel(SocketChannel channel2) throws IOException {
this.channel = channel2;
mChannelToPipes.put(channel, this);
channel.configureBlocking(false);
}
public void close() throws IOException {
mChannelToPipes.remove(channel);
try {
channel.close();
} catch (IOException e) {
// ignore
}
channel = null;
if (key != null) {
key.cancel();
key = null;
}
}
public void listenForRead(boolean on) {
if (on) {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_READ);
}
}
public void listenForWrite(boolean on) {
if (on) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_WRITE);
}
}
}
/\*\*
\* Represents one link from the client to the server. It is an association
\* of the two ends of the link.
\*/
private class Conduit {
public PipeEnd client;
public PipeEnd server;
public int id;
public Conduit() {
client = new PipeEnd("CLIENT");
client.conduit = this;
server = new PipeEnd("SERVER");
server.conduit = this;
client.other = server;
server.other = client;
id = mNPassThroughsCreated++;
}
}
/\*\*
\* Finds a port to listen on
\*
\* @return a newly initialized receptor
\* @throws Exception on any failure
\*/
private Receptor selectPort() throws Exception {
Receptor ret;
// Find a port to listen on; try up to 100 port numbers
Random random = new Random();
for (int i = 0; i < 100; i++) {
int port = 50000 + random.nextInt(1000);
try {
ret = new Receptor(port);
ret.bind();
return ret;
} catch (IOException ignore) {
// Ignore
}
}
throw new Exception("Could not bind port");
}
/\*\*
\* The main event loop
\*/
public void run() {
// ===== STARTUP ==========
// The main thread will wait until the server is actually listening and ready
// to process incoming connections. Failures during startup should be
// propagated back to the calling thread.
try {
selector = Selector.open();
// Acceptor
mReceptor.serverSocketChannel.configureBlocking(false);
mReceptor.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
synchronized (mCmdSync) {
mStartupFailure = e;
}
}
// ===== STARTUP COMPLETE ==========
// Tha main thread is waiting on the ack lock; notify the main thread.
// Startup errors are communicated through the mStartupFailure variable.
mAck.release();
if (mStartupFailure != null) {
return;
}
// ===== RUN: event loop ==========
// The proxy thread spends its life in this event handling loop in which
// it deals with requests from the main thread and from notifications from
// NIO.
try {
loop: for (;;) {
int nEvents = selector.select();
// ===== COMMANDS ==========
// Process requests from the main thread. The communication mechanism
// is simple: the command is communicated through a variable; the main
// thread waits until the mAck lock is set.
switch (getCmd()) {
case STOP: {
ack();
break loop;
}
case KILLALL: {
PipeEnd[] pipes = toPipeArray();
for (int i = 0; i < pipes.length; i++) {
pipes[i].close();
}
ack();
continue;
}
case KILLLAST: {
PipeEnd[] pipes = toPipeArray();
Conduit last = pipes.length > 0 ? pipes[0].conduit : null;
if (last != null) {
for (int i = 0; i < pipes.length; i++) {
if (pipes[i].conduit.id > last.id) {
last = pipes[i].conduit;
}
}
last.client.close();
last.server.close();
}
ack();
continue;
}
}
//===== NIO Event handling ==========
if (nEvents == 0) {
continue;
}
Set keySet = selector.selectedKeys();
for (Iterator iter = keySet.iterator(); iter.hasNext();) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
//===== ACCEPT ==========
// A client connection has come in. Perform an async connect to
// the server. The remainder of the connect is going to be done in
// the CONNECT event handling.
if (key.isValid() && key.isAcceptable()) {
sLog.fine(">Incoming connection");
try {
Conduit pt = new Conduit();
ServerSocketChannel ss = (ServerSocketChannel) key.channel();
// Accept
pt.client.setChannel(ss.accept());
// Do asynchronous connect to relay server
pt.server.setChannel(SocketChannel.open());
pt.server.key = pt.server.channel.register(
selector, SelectionKey.OP_CONNECT);
pt.server.channel.connect(new InetSocketAddress(
mRelayServer, mRelayPort));
} catch (IOException e) {
System.err.println(">Unable to accept channel");
e.printStackTrace();
// selectionKey.cancel();
}
}
//===== CONNECT ==========
// Event that is generated when the connection to the server has
// completed. Here we need to initialize both pipe-ends. Both ends
// need to start reading. If the connection had not succeeded, the
// client needs to be closed immediately.
if (key != null && key.isValid() && key.isConnectable()) {
SocketChannel c = (SocketChannel) key.channel();
PipeEnd p = (PipeEnd) mChannelToPipes.get(c); // SERVER-SIDE
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">CONNECT event on " + p + " -- other: " + p.other);
}
boolean success;
try {
success = c.finishConnect();
} catch (RuntimeException e) {
success = false;
if (sLog.isLoggable(Level.FINE)) {
sLog.log(Level.FINE, "Connect failed: " + e, e);
}
}
if (!success) {
// Connection failure
p.close();
p.other.close();
// Unregister the channel with this selector
key.cancel();
key = null;
} else {
// Connection was established successfully
// Both need to be in readmode; note that the key for
// "other" has not been created yet
p.other.key = p.other.channel.register(selector, SelectionKey.OP_READ);
p.key.interestOps(SelectionKey.OP_READ);
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END CONNECT event on " + p + " -- other: " + p.other);
}
}
//===== READ ==========
// Data was received. The data needs to be written to the other
// end. Note that data from client to server is processed one chunk
// at a time, i.e. a chunk of data is read from the client; then
// no new data is read from the client until the complete chunk
// is written to to the server. This is why the interest-fields
// in the key are toggled back and forth. Ofcourse the same holds
// true for data from the server to the client.
if (key != null && key.isValid() && key.isReadable()) {
PipeEnd p = (PipeEnd) mChannelToPipes.get(key.channel());
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">READ event on " + p + " -- other: " + p.other);
}
// Read data
p.buf.clear();
int n;
try {
n = p.channel.read(p.buf);
} catch (IOException e) {
n = -1;
}
if (n >= 0) {
// Write to other end
p.buf.flip();
int nw = p.other.channel.write(p.buf);
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">Read " + n + " from " + p.name + "; wrote " + nw);
}
p.other.listenForWrite(true);
p.listenForRead(false);
} else {
// Disconnected
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Disconnected");
}
p.close();
key = null;
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Now present: " + mChannelToPipes.size());
}
p.other.close();
// Stop reading from other side
if (p.other.channel != null) {
p.other.listenForRead(false);
p.other.listenForWrite(true);
}
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END READ event on " + p + " -- other: " + p.other);
}
}
//===== WRITE ==========
// Data was sent. As for READ, data is processed in chunks which
// is why the interest READ and WRITE bits are flipped.
// In the case a connection failure is detected, there still may be
// data in the READ buffer that was not read yet. Example, the
// client sends a LOGOFF message to the server, the server then sends
// back a BYE message back to the client; depending on when the
// write failure event comes in, the BYE message may still be in
// the buffer and must be read and sent to the client before the
// client connection is closed.
if (key != null && key.isValid() && key.isWritable()) {
PipeEnd p = (PipeEnd) mChannelToPipes.get(key.channel());
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">WRITE event on " + p + " -- other: " + p.other);
}
// More to write?
if (p.other.buf.hasRemaining()) {
int n = p.channel.write(p.other.buf);
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">Write some more to " + p.name + ": " + n);
}
} else {
if (p.other.channel != null) {
// Read from input again
p.other.buf.clear();
p.other.buf.flip();
p.other.listenForRead(true);
p.listenForWrite(false);
} else {
// Close
p.close();
key = null;
if (sLog.isLoggable(Level.FINE)) {
sLog.fine("Now present: " + mChannelToPipes.size());
}
}
}
if (sLog.isLoggable(Level.FINE)) {
sLog.fine(">END WRITE event on " + p + " -- other: " + p.other);
}
}
}
}
} catch (Exception e) {
sLog.log(Level.SEVERE, "Proxy main loop error: " + e, e);
e.printStackTrace();
synchronized (mCmdSync) {
mUnexpectedThreadFailure = e;
}
}
// ===== CLEANUP =====
// The main event loop has exited; close all connections
try {
selector.close();
PipeEnd[] pipes = toPipeArray();
for (int i = 0; i < pipes.length; i++) {
pipes[i].close();
}
mReceptor.close();
} catch (IOException e) {
sLog.log(Level.SEVERE, "Cleanup error: " + e, e);
e.printStackTrace();
}
}
private PipeEnd[] toPipeArray() {
return (PipeEnd[]) mChannelToPipes.values().toArray(
new PipeEnd[mChannelToPipes.size()]);
}
private int getCmd() {
synchronized (mCmdSync) {
return mCmd;
}
}
private void ack() {
setCmd(NONE);
mAck.release();
}
private void setCmd(int cmd) {
synchronized (mCmdSync) {
mCmd = cmd;
}
}
private void request(int cmd) {
setCmd(cmd);
selector.wakeup();
try {
mAck.acquire();
} catch (InterruptedException e) {
// ignore
}
}
/\*\*
\* Closes the proxy
\*/
public void close() {
if (mStopped) {
return;
}
mStopped = true;
synchronized (mCmdSync) {
if (mUnexpectedThreadFailure != null) {
throw new RuntimeException("Unexpected thread exit: " + mUnexpectedThreadFailure, mUnexpectedThreadFailure);
}
}
request(STOP);
}
/\*\*
\* Restarts after close
\*
\* @throws Exception
\*/
public void restart() throws Exception {
close();
mChannelToPipes = new IdentityHashMap();
mAck = new Semaphore(0);
mStartupFailure = null;
mUnexpectedThreadFailure = null;
mReceptor.bind();
new Thread(this, "TCPProxy on " + mReceptor.port).start();
mStopped = false;
mAck.acquire();
if (mStartupFailure != null) {
throw mStartupFailure;
}
}
/\*\*
\* Returns the port number this proxy listens on
\*
\* @return port number
\*/
public int getPort() {
return mReceptor.port;
}
/\*\*
\* Kills all connections; data may be lost
\*/
public void killAllConnections() {
request(KILLALL);
}
/\*\*
\* Kills the last created connection; data may be lost
\*/
public void killLastConnection() {
request(KILLLAST);
}
/\*\*
\* Closes the proxy
\*
\* @param proxy
\*/
public void safeClose(TcpProxyNIO proxy) {
if (proxy != null) {
proxy.close();
}
}
}
Other uses of the port-forwarding proxy
What else can you do with this port-forwarding proxy? First of all,
its use is not limited to outbound connections: ofcourse you can also
use it to test connection failures on inbound connections. Next,
you can also use it to make sure that connections are infact being made
the way that you expected. For example, the CAPS JMS server can use
both SSL and non SSL connections. I added a few tests to our internal
test suite to test this capability from JMSJCA. Here, just to make sure
that the test itself works as expected, I'm using the proxy to find out
if the URL was indeed modified and that the connections are indeed
going to the JMS server's SSL port. Similary, you can also use the
proxy to count the number connections being made. E.g. if you want to
test that connection pooling indeed works, you can use this proxy and
assert that the number of connections created does not exceed the
number of connections in the pool.
Missing components
In the example I mentioned updating the EAR file and automatically
deploying the EAR file to the application server. I've created tools
for those too so that you can do those things programmatically as well.
Drop me a line if you're interested and I'll blog about those too.
1 comment:
My question is not on testing connections on RAs but on re-establishing them. I want to dynamically change the outbound IP addresses on connections from JCAPS HL7 eWay. The intent is to get the ip addresses from a lookup service and dynamically re-assign a thread from the pool to make a new connection and send the message out. Otherwise I will have to create thousands of outbound eWays to achieve the same result. Any help is greatly appreciated. Thanks Z.
Post a Comment