Saturday, April 11, 2009

DelayQueue via interceptor

In the previous post I published a simple solution for using java.util.concurrent.DelayQueue with Spring Integration queue channel. Then Iwein Fuld suggested a nice improvement of namespace configuration. I liked the idea, but if the underlying queue is DelayQueue how to ensure all elements implement Delayed interface if I don't override doSend method and use the standard QueueChannel? That can be done with a ChannelInterceptor. I really like the way the guys from Spring Integration designed the API, there is an extension point just where you need it.

So if Iwein's proposition will be implemented, the configuration of the delay queue will look like this:


  
  
    
  



This also solves the problem with different delays for messages in the same queue. I made an example with HeaderDelayInterceptor, which looks at message header to set the time out.

It can be configured similar to SimpleDelayInterceptor:
While we are waiting for the queue-class feature to be implemented, we can create channel instances as usual Spring beans:



  
  
    
      
    
  
The important thing to remember is to put this interceptor the last, if you need other interceptors. This is important, because other interceptors might create Message instances not implementing Delayed, and the DelayQueue will throw ClassCastException. While I'm looking for a better place to host source code, it will be listed here:
/**
 * Wraps messages sent to the channel to implement Delayed interface. Required
 * for queue channel created with DelayQueue instance. Causes messages to wait
 * in the queue for the specified delay.
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class SimpleDelayInterceptor extends ChannelInterceptorAdapter {

    private long delay;
    private TimeUnit timeUnit;

    public SimpleDelayInterceptor(long delay, TimeUnit timeUnit) {
        this.delay = delay;
        this.timeUnit = timeUnit;
    }

    @Override
    public Message preSend(Message message, MessageChannel channel) {
        final long sendingTime = System.currentTimeMillis();
        return new DelayedMessageAdapter(message) {
            public long getDelay(TimeUnit unit) {
                long millisPassed = System.currentTimeMillis() - sendingTime;
                long unitsPassed = unit.convert(millisPassed, TimeUnit.MILLISECONDS);
                long delayInGivenUnits = unit.convert(delay, timeUnit);
                return delayInGivenUnits - unitsPassed;
            }
        };
    }
}

/**
 * QueueChannel requires its elements to implement Message, and
 * DelayQueue requires its elements to implement Delayed. This
 * class implements both to satisfy these requirements. For Mesage
 * interface it acts as a proxy and forwards all calls to the wrapped Message.
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public abstract class DelayedMessageAdapter implements Delayed, Message {
    private Message wrappedMessage;

    public DelayedMessageAdapter(Message wrappedMessage) {
        this.wrappedMessage = wrappedMessage;
    }

    protected Message getMessage() {
        return wrappedMessage;
    }

    public abstract long getDelay(TimeUnit unit);

    public int compareTo(Delayed o) {
        return new Long(getDelay(TimeUnit.NANOSECONDS))
                .compareTo(o.getDelay(TimeUnit.NANOSECONDS));
    }

    public MessageHeaders getHeaders() {
        return wrappedMessage.getHeaders();
    }

    public T getPayload() {
        return wrappedMessage.getPayload();
    }

    @Override
    public String toString() {
        return wrappedMessage.toString();
    }
}

/**
 * Wraps messages sent to the channel to implement Delayed interface. Required
 * for queue channel created with DelayQueue instance. Causes messages to wait
 * in the queue till System.currentTimeInMillis() reaches the value specified
 * in message header. The header name is customizable.
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class HeaderDelayInterceptor extends ChannelInterceptorAdapter {
    String headerName;

    public HeaderDelayInterceptor(String headerName) {
        this.headerName = headerName;
    }

    @Override
    public Message preSend(Message message, MessageChannel channel) {
        //fail early if header is missing or incorrect
        Object waitTill=message.getHeaders().get(headerName);
        if (waitTill==null)
            throw new IllegalArgumentException("HeaderDelayInterceptor expects " +
                "header with name:" + headerName +
                " which was not found in message:" + message);
        if (!(waitTill instanceof Long))
            throw new IllegalArgumentException("HeaderDelayInterceptor expects " +
                "Long value in header with name:" + headerName +
                " incompatible type found in message:" + message);
        //everything looks OK, create a wrapped message
        return new DelayedMessageAdapter(message){
            public long getDelay(TimeUnit unit) {
                Long waitTill=(Long)getMessage().getHeaders().get(headerName);
                long delayRemained=waitTill-System.currentTimeMillis();
                return unit.convert(delayRemained, TimeUnit.MILLISECONDS);
            }
        };
    }
}
And the unit test is here:
/**
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class DelayQueueChannelTests {
    static Logger logger = Logger.getLogger(DelayQueueChannelTests.class.getName());

    @Test
    public void testSimpleDelay() throws Exception {
        final AtomicBoolean messageReceived = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        final QueueChannel channel = new QueueChannel(new DelayQueue());
        channel.addInterceptor(new SimpleDelayInterceptor(100, TimeUnit.MILLISECONDS));
        new Thread(new Runnable() {

            public void run() {
                Message message = (Message)channel.receive();
                assertTrue(message instanceof DelayedMessageAdapter);
                messageReceived.set(true);
                latch.countDown();
                float waitTime=(System.currentTimeMillis()-message.getPayload())/1000.F;
                logger.info("waited for "+waitTime+" seconds ");
            }
        }).start();
        assertFalse(messageReceived.get());
        channel.send(new GenericMessage(System.currentTimeMillis()));
        assertFalse(messageReceived.get());
        latch.await(25, TimeUnit.MILLISECONDS);
        assertFalse(messageReceived.get());
        latch.await(1, TimeUnit.SECONDS);
        assertTrue(messageReceived.get());
    }

    private final static String HEADER_NAME="test.waitTill";

    @Test
    public void testMessageDelay() throws Exception {
        final AtomicBoolean [] messagesReceived = new AtomicBoolean[] {
            new AtomicBoolean(false), new AtomicBoolean(false)
        };
        final CountDownLatch latch1 = new CountDownLatch(1);
        final CountDownLatch latch2 = new CountDownLatch(1);
        final QueueChannel channel = new QueueChannel(new DelayQueue());
        channel.addInterceptor(new HeaderDelayInterceptor(HEADER_NAME));
        new Thread(new Runnable() {

            public void run() {
                Message message = (Message)channel.receive();
                assertTrue(message instanceof DelayedMessageAdapter);
                messagesReceived[message.getPayload()].set(true);
                latch1.countDown();
                message = (Message)channel.receive();
                assertTrue(message instanceof DelayedMessageAdapter);
                messagesReceived[message.getPayload()].set(true);
                latch2.countDown();
            }
        }).start();
        assertFalse(messagesReceived[0].get());
        assertFalse(messagesReceived[1].get());
        long now=System.currentTimeMillis();
        channel.send(MessageBuilder.withPayload(0).setHeader(HEADER_NAME, now+200).build());
        channel.send(MessageBuilder.withPayload(1).setHeader(HEADER_NAME, now+100).build());
        assertFalse(messagesReceived[0].get());
        assertFalse(messagesReceived[1].get());
        latch1.await(25, TimeUnit.MILLISECONDS);   //not enough time for either message
        assertFalse(messagesReceived[0].get());
        assertFalse(messagesReceived[1].get());
        latch1.await(170, TimeUnit.MILLISECONDS);         //the second message should be ready before the first
        assertFalse(messagesReceived[0].get());
        assertTrue(messagesReceived[1].get());
        latch2.await(200, TimeUnit.MILLISECONDS);
        assertTrue(messagesReceived[0].get());
    }
}

No comments: