Saturday, April 11, 2009

DelayQueueChannel for Spring Integration

This is a beginning of discussion, continued here.

Spring Integration is an amazing project. It allows with a few lines of code or with a small Spring XML configuration to establish a powerful Enterprise Application Integration server. We are used to think about EAI as a heavy weight solution, but with Spring Integration it's a modest library deployed together with your console or web application. I'm really excited about its ease of use.

The central component in this framework is a message channel. It has a few implementations, most basic of which are direct channel and queue channel. Direct channel allows processing in the same thread, and queue channel holds messages until a processor will take them for processing.

When a message processing fails, it's usually desirable to wait before retrying. In EAI your application often depends on remote servers, which may be temporarily unavailable. If you try the failing operation in a few minutes, it has better chances to succeed.

Out of the box, Spring Integration does not provide a facility to delay messages for such a long period. While it's easy to insert a sleeping in the middle of processing, sleeping for long periods will waste precious thread resources of the server. So I extended Spring Integration queue channel to support delays.

Fortunately, Java has a standard DelayQueue, a part of java concurrency API, one of the best in Java. Used correctly, it performs very fast and is relatively error-proof.

The basic Spring Integration queue channel was created with extensibility in mind. It has a constructor accepting a java.util.concurrent.BlockingQueue instance, and DelayQueue is just an implementation of BlockingQueue. So what's left is to glue this great components together to do the job.

/**
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class SimpleDelayQueueChannel extends QueueChannel {
    private long delay;
    private TimeUnit timeUnit;

    /**
     * QueueChannel requires its elements to implement Message, and
     * DelayQueue requires its elements to implement Delayed. This
     * class implements both to satisfy these requirements. For Mesage
     * interface it acts as a proxy and forwards all calls to the wrapped Message.
     */
    protected class DelayedMessage implements Delayed, Message {
        long createdClock;
        Message wrappedMessage;

        public DelayedMessage(Message wrappedMessage) {
            this.wrappedMessage = wrappedMessage;
            createdClock = System.currentTimeMillis();
        }

        public long getDelay(TimeUnit unit) {
            long millisPassed=System.currentTimeMillis()-createdClock;
            long unitsPassed=unit.convert(millisPassed, TimeUnit.MILLISECONDS);
            long delayInGivenUnits=unit.convert(delay, timeUnit);
            return delayInGivenUnits-unitsPassed;
        }

        public int compareTo(Delayed o) {
            if (o instanceof DelayedMessage)
                return new Long(createdClock)
                        .compareTo(((DelayedMessage)o).createdClock);

            return new Long(getDelay(TimeUnit.NANOSECONDS))
                    .compareTo(o.getDelay(TimeUnit.NANOSECONDS));
        }

        public MessageHeaders getHeaders() {
            return wrappedMessage.getHeaders();
        }

        public Object getPayload() {
            return wrappedMessage.getPayload();
        }

    }

    public SimpleDelayQueueChannel(long delay, TimeUnit timeUnit) {
        /* QueueChannel expects a queue capable of holding any Message,
         * but DelayQueue requires its elements to implement Delayed interface.
         * So we MUST override doSend so we control what is inserted into
         * the queue.
         */
        super((BlockingQueue)new DelayQueue());
        this.delay=delay;
        this.timeUnit=timeUnit;
    }

    @Override
    protected boolean doSend(Message message, long timeout) {
        return super.doSend(new DelayedMessage(message), timeout);
    }
}
This class was created with a brevity on mind. It has a few shortcomings. The most significant is hard coded delay calculation, based on the system time when a message is inserted into the queue. I want to extract this logic into delay strategy class. This will allow the same queue to hold elements with different delays, for example.

And the following is the unit test for the simple delay queue channel.

public class DelayQueueChannelTests {
    static Logger logger = Logger.getLogger(DelayQueueChannelTests.class.getName());
    @Test
    public void testSimpleSendAndReceive() throws Exception {
        final AtomicBoolean messageReceived = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        final SimpleDelayQueueChannel channel = new SimpleDelayQueueChannel(100, TimeUnit.MILLISECONDS);
        new Thread(new Runnable() {

            public void run() {
                Message message = (Message)channel.receive();
                messageReceived.set(true);
                latch.countDown();
                assertTrue(message instanceof SimpleDelayQueueChannel.DelayedMessage);
                float waitTime=(System.currentTimeMillis()-message.getPayload())/1000.F;
                logger.fine("waited for "+waitTime+" seconds ");
            }
        }).start();
        assertFalse(messageReceived.get());
        channel.send(new GenericMessage(System.currentTimeMillis()));
        assertFalse(messageReceived.get());
        latch.await(25, TimeUnit.MILLISECONDS);
        assertFalse(messageReceived.get());
        latch.await(1, TimeUnit.SECONDS);
        assertTrue(messageReceived.get());
    }
}
//funny how blogspot lower-cases and closes  "tags" :-)
//don't copy them to java, and change long to Long in angle brackets in the code.
Do you have any comments, suggestions, questions? Don't hesitate to comment here, there or by e-mail.

5 comments:

Vincent said...

Very useful as a 'retry channel', thanks.
Vincent

Andrew Skiba said...

I'm glad it is useful. What variant do you prefer, the one on this page, or the interceptor? How do you count the retries in your application?

Vincent said...

Sorry for the ... delay.

I find the interceptor variant more difficult to configure, the delayed queue seems more intuitive to me.

For the retry count I use a message header. A service activator handles failed message that arrives in the central error channel, adds or increments a retry count header value or discards messages to a dead letter channel (if the retry count is over the limit for instance) and finally sends the message to the retry channel. A router takes messages in the retry channel and routes them to the last channel before the message failure (the last channel value is configured with a header-enricher).

I follow your discussion with Mark Fisher. He says that, in 1.0.3, channels will have a TaskExecutor reference option but I haven't seen anything yet in the SI sources trunk. Anyway, in a retry use case for which I want to delay message processing, again, the DelayQueue solution seems definitely more natural to me.

Vincent

Andrew Skiba said...

I just checked the trunk, and found task executor sub element in org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd

So you can use Mark's code to set a delay executor on channel, and I would use this solution today. In no way it's worse than the solution I documented in this blog item, but it has advantage of being more standard from now on.

Vincent said...

Ok, I didn't look at the xsd.
Mark Fisher's solution has advantage of allowing the use of any BlockingQueue implementation with a similar configuration complexity.