Friday, April 17, 2009

AppEngine project on NetBeans

This is a beginning of discussion, continued here

Recently Google released an early look of AppEngine for Java. It includes an Eclipse plugin for developing with AppEngine Java SDK. I wanted to check is it possible to develop AppEngine Java application using NetBeans

Environment

  1. Sun JDK 1.6.0_12
  2. NetBeans 6.7 M2 (pre-release)
  3. gae-java-sdk-1.2.0

Opening project

Let's start with basic AppEngine demo Guestbook. It's located in demos/guestbook directory of gae-sdk-java.

Create project wizard

Start the wizard with File|New Project.

Step 1

Select options like this:

Step 2

Enter the location of the project in the edit box, or click browse:

The rest of the lines will be filled automatically.

Step 3

Leave the default options on the "Build and Run Actions" page.

Step 4

Don't change anything on the "Web Sources" page.

Step 5

Click "Next" on "Source Package Folders" page.

Step 6

Click "Add JAR/Folder" on the "Java Sources Classpath" page and add all jars located in war/WEB-INF/lib folder under guestbook root:

Click Finish to leave default settings on the last two pages.

Fixing classpath

The resulting project will look like this:

Two Servlet files have errors because NetBeans has limited abilities on parsing ant build files. It could not extract the compile time dependencies from build.xml, so we pointed to WEB-INF/lib libraries at Step 6. But one of the compile dependencies (Servlet API jar) is located outside of the project tree. It's because this jar is supplied by the application server. In my pre-release version of NetBeans the UI is not able to use dependencies outside of the project tree, but it's easy to work around.

Edit project.xml

Press Ctrl-F2 or select Window|Files to switch to files panel. You see all files under your project root:

Click on + sign to open nbproject, right-click on project.xml and select Edit. This opens internal NetBeans project file which contains all the settings we selected in the wizard. Find line which contains <classpath> element near the end of the file. Go to the end of the line and add the path Servlet API jar. This jar is located in lib/shared folder under the Google AppEngine Java SDK folder, in my case the full path was C:\work\appengine-java-sdk-1.2.0\lib\shared\geronimo-servlet_2.5_spec-1.2.jar

Press Ctrl-S to save the project.xml file and return to Projects pane (press Ctrl-1). Now NetBeans is happy and no errors are reported.

Running the application

You can run this application as you run any project in NetBeans. If this is the main project, simply press F6. When the project is running, you will see the following line in the Output window: The server is running at http://localhost:8080/. You can enter this URL in a browser and start using the Guestbook application. In this post you can find out how to debug AppEngine java web application using NetBeans.

Saturday, April 11, 2009

DelayQueue via interceptor

In the previous post I published a simple solution for using java.util.concurrent.DelayQueue with Spring Integration queue channel. Then Iwein Fuld suggested a nice improvement of namespace configuration. I liked the idea, but if the underlying queue is DelayQueue how to ensure all elements implement Delayed interface if I don't override doSend method and use the standard QueueChannel? That can be done with a ChannelInterceptor. I really like the way the guys from Spring Integration designed the API, there is an extension point just where you need it.

So if Iwein's proposition will be implemented, the configuration of the delay queue will look like this:


  
  
    
  



This also solves the problem with different delays for messages in the same queue. I made an example with HeaderDelayInterceptor, which looks at message header to set the time out.

It can be configured similar to SimpleDelayInterceptor:
While we are waiting for the queue-class feature to be implemented, we can create channel instances as usual Spring beans:



  
  
    
      
    
  
The important thing to remember is to put this interceptor the last, if you need other interceptors. This is important, because other interceptors might create Message instances not implementing Delayed, and the DelayQueue will throw ClassCastException. While I'm looking for a better place to host source code, it will be listed here:
/**
 * Wraps messages sent to the channel to implement Delayed interface. Required
 * for queue channel created with DelayQueue instance. Causes messages to wait
 * in the queue for the specified delay.
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class SimpleDelayInterceptor extends ChannelInterceptorAdapter {

    private long delay;
    private TimeUnit timeUnit;

    public SimpleDelayInterceptor(long delay, TimeUnit timeUnit) {
        this.delay = delay;
        this.timeUnit = timeUnit;
    }

    @Override
    public Message preSend(Message message, MessageChannel channel) {
        final long sendingTime = System.currentTimeMillis();
        return new DelayedMessageAdapter(message) {
            public long getDelay(TimeUnit unit) {
                long millisPassed = System.currentTimeMillis() - sendingTime;
                long unitsPassed = unit.convert(millisPassed, TimeUnit.MILLISECONDS);
                long delayInGivenUnits = unit.convert(delay, timeUnit);
                return delayInGivenUnits - unitsPassed;
            }
        };
    }
}

/**
 * QueueChannel requires its elements to implement Message, and
 * DelayQueue requires its elements to implement Delayed. This
 * class implements both to satisfy these requirements. For Mesage
 * interface it acts as a proxy and forwards all calls to the wrapped Message.
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public abstract class DelayedMessageAdapter implements Delayed, Message {
    private Message wrappedMessage;

    public DelayedMessageAdapter(Message wrappedMessage) {
        this.wrappedMessage = wrappedMessage;
    }

    protected Message getMessage() {
        return wrappedMessage;
    }

    public abstract long getDelay(TimeUnit unit);

    public int compareTo(Delayed o) {
        return new Long(getDelay(TimeUnit.NANOSECONDS))
                .compareTo(o.getDelay(TimeUnit.NANOSECONDS));
    }

    public MessageHeaders getHeaders() {
        return wrappedMessage.getHeaders();
    }

    public T getPayload() {
        return wrappedMessage.getPayload();
    }

    @Override
    public String toString() {
        return wrappedMessage.toString();
    }
}

/**
 * Wraps messages sent to the channel to implement Delayed interface. Required
 * for queue channel created with DelayQueue instance. Causes messages to wait
 * in the queue till System.currentTimeInMillis() reaches the value specified
 * in message header. The header name is customizable.
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class HeaderDelayInterceptor extends ChannelInterceptorAdapter {
    String headerName;

    public HeaderDelayInterceptor(String headerName) {
        this.headerName = headerName;
    }

    @Override
    public Message preSend(Message message, MessageChannel channel) {
        //fail early if header is missing or incorrect
        Object waitTill=message.getHeaders().get(headerName);
        if (waitTill==null)
            throw new IllegalArgumentException("HeaderDelayInterceptor expects " +
                "header with name:" + headerName +
                " which was not found in message:" + message);
        if (!(waitTill instanceof Long))
            throw new IllegalArgumentException("HeaderDelayInterceptor expects " +
                "Long value in header with name:" + headerName +
                " incompatible type found in message:" + message);
        //everything looks OK, create a wrapped message
        return new DelayedMessageAdapter(message){
            public long getDelay(TimeUnit unit) {
                Long waitTill=(Long)getMessage().getHeaders().get(headerName);
                long delayRemained=waitTill-System.currentTimeMillis();
                return unit.convert(delayRemained, TimeUnit.MILLISECONDS);
            }
        };
    }
}
And the unit test is here:
/**
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class DelayQueueChannelTests {
    static Logger logger = Logger.getLogger(DelayQueueChannelTests.class.getName());

    @Test
    public void testSimpleDelay() throws Exception {
        final AtomicBoolean messageReceived = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        final QueueChannel channel = new QueueChannel(new DelayQueue());
        channel.addInterceptor(new SimpleDelayInterceptor(100, TimeUnit.MILLISECONDS));
        new Thread(new Runnable() {

            public void run() {
                Message message = (Message)channel.receive();
                assertTrue(message instanceof DelayedMessageAdapter);
                messageReceived.set(true);
                latch.countDown();
                float waitTime=(System.currentTimeMillis()-message.getPayload())/1000.F;
                logger.info("waited for "+waitTime+" seconds ");
            }
        }).start();
        assertFalse(messageReceived.get());
        channel.send(new GenericMessage(System.currentTimeMillis()));
        assertFalse(messageReceived.get());
        latch.await(25, TimeUnit.MILLISECONDS);
        assertFalse(messageReceived.get());
        latch.await(1, TimeUnit.SECONDS);
        assertTrue(messageReceived.get());
    }

    private final static String HEADER_NAME="test.waitTill";

    @Test
    public void testMessageDelay() throws Exception {
        final AtomicBoolean [] messagesReceived = new AtomicBoolean[] {
            new AtomicBoolean(false), new AtomicBoolean(false)
        };
        final CountDownLatch latch1 = new CountDownLatch(1);
        final CountDownLatch latch2 = new CountDownLatch(1);
        final QueueChannel channel = new QueueChannel(new DelayQueue());
        channel.addInterceptor(new HeaderDelayInterceptor(HEADER_NAME));
        new Thread(new Runnable() {

            public void run() {
                Message message = (Message)channel.receive();
                assertTrue(message instanceof DelayedMessageAdapter);
                messagesReceived[message.getPayload()].set(true);
                latch1.countDown();
                message = (Message)channel.receive();
                assertTrue(message instanceof DelayedMessageAdapter);
                messagesReceived[message.getPayload()].set(true);
                latch2.countDown();
            }
        }).start();
        assertFalse(messagesReceived[0].get());
        assertFalse(messagesReceived[1].get());
        long now=System.currentTimeMillis();
        channel.send(MessageBuilder.withPayload(0).setHeader(HEADER_NAME, now+200).build());
        channel.send(MessageBuilder.withPayload(1).setHeader(HEADER_NAME, now+100).build());
        assertFalse(messagesReceived[0].get());
        assertFalse(messagesReceived[1].get());
        latch1.await(25, TimeUnit.MILLISECONDS);   //not enough time for either message
        assertFalse(messagesReceived[0].get());
        assertFalse(messagesReceived[1].get());
        latch1.await(170, TimeUnit.MILLISECONDS);         //the second message should be ready before the first
        assertFalse(messagesReceived[0].get());
        assertTrue(messagesReceived[1].get());
        latch2.await(200, TimeUnit.MILLISECONDS);
        assertTrue(messagesReceived[0].get());
    }
}

DelayQueueChannel for Spring Integration

This is a beginning of discussion, continued here.

Spring Integration is an amazing project. It allows with a few lines of code or with a small Spring XML configuration to establish a powerful Enterprise Application Integration server. We are used to think about EAI as a heavy weight solution, but with Spring Integration it's a modest library deployed together with your console or web application. I'm really excited about its ease of use.

The central component in this framework is a message channel. It has a few implementations, most basic of which are direct channel and queue channel. Direct channel allows processing in the same thread, and queue channel holds messages until a processor will take them for processing.

When a message processing fails, it's usually desirable to wait before retrying. In EAI your application often depends on remote servers, which may be temporarily unavailable. If you try the failing operation in a few minutes, it has better chances to succeed.

Out of the box, Spring Integration does not provide a facility to delay messages for such a long period. While it's easy to insert a sleeping in the middle of processing, sleeping for long periods will waste precious thread resources of the server. So I extended Spring Integration queue channel to support delays.

Fortunately, Java has a standard DelayQueue, a part of java concurrency API, one of the best in Java. Used correctly, it performs very fast and is relatively error-proof.

The basic Spring Integration queue channel was created with extensibility in mind. It has a constructor accepting a java.util.concurrent.BlockingQueue instance, and DelayQueue is just an implementation of BlockingQueue. So what's left is to glue this great components together to do the job.

/**
 * @author Andrew Skiba skibaa@gmail.com
 * Use for any purpose at your own risk.
 */
public class SimpleDelayQueueChannel extends QueueChannel {
    private long delay;
    private TimeUnit timeUnit;

    /**
     * QueueChannel requires its elements to implement Message, and
     * DelayQueue requires its elements to implement Delayed. This
     * class implements both to satisfy these requirements. For Mesage
     * interface it acts as a proxy and forwards all calls to the wrapped Message.
     */
    protected class DelayedMessage implements Delayed, Message {
        long createdClock;
        Message wrappedMessage;

        public DelayedMessage(Message wrappedMessage) {
            this.wrappedMessage = wrappedMessage;
            createdClock = System.currentTimeMillis();
        }

        public long getDelay(TimeUnit unit) {
            long millisPassed=System.currentTimeMillis()-createdClock;
            long unitsPassed=unit.convert(millisPassed, TimeUnit.MILLISECONDS);
            long delayInGivenUnits=unit.convert(delay, timeUnit);
            return delayInGivenUnits-unitsPassed;
        }

        public int compareTo(Delayed o) {
            if (o instanceof DelayedMessage)
                return new Long(createdClock)
                        .compareTo(((DelayedMessage)o).createdClock);

            return new Long(getDelay(TimeUnit.NANOSECONDS))
                    .compareTo(o.getDelay(TimeUnit.NANOSECONDS));
        }

        public MessageHeaders getHeaders() {
            return wrappedMessage.getHeaders();
        }

        public Object getPayload() {
            return wrappedMessage.getPayload();
        }

    }

    public SimpleDelayQueueChannel(long delay, TimeUnit timeUnit) {
        /* QueueChannel expects a queue capable of holding any Message,
         * but DelayQueue requires its elements to implement Delayed interface.
         * So we MUST override doSend so we control what is inserted into
         * the queue.
         */
        super((BlockingQueue)new DelayQueue());
        this.delay=delay;
        this.timeUnit=timeUnit;
    }

    @Override
    protected boolean doSend(Message message, long timeout) {
        return super.doSend(new DelayedMessage(message), timeout);
    }
}
This class was created with a brevity on mind. It has a few shortcomings. The most significant is hard coded delay calculation, based on the system time when a message is inserted into the queue. I want to extract this logic into delay strategy class. This will allow the same queue to hold elements with different delays, for example.

And the following is the unit test for the simple delay queue channel.

public class DelayQueueChannelTests {
    static Logger logger = Logger.getLogger(DelayQueueChannelTests.class.getName());
    @Test
    public void testSimpleSendAndReceive() throws Exception {
        final AtomicBoolean messageReceived = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        final SimpleDelayQueueChannel channel = new SimpleDelayQueueChannel(100, TimeUnit.MILLISECONDS);
        new Thread(new Runnable() {

            public void run() {
                Message message = (Message)channel.receive();
                messageReceived.set(true);
                latch.countDown();
                assertTrue(message instanceof SimpleDelayQueueChannel.DelayedMessage);
                float waitTime=(System.currentTimeMillis()-message.getPayload())/1000.F;
                logger.fine("waited for "+waitTime+" seconds ");
            }
        }).start();
        assertFalse(messageReceived.get());
        channel.send(new GenericMessage(System.currentTimeMillis()));
        assertFalse(messageReceived.get());
        latch.await(25, TimeUnit.MILLISECONDS);
        assertFalse(messageReceived.get());
        latch.await(1, TimeUnit.SECONDS);
        assertTrue(messageReceived.get());
    }
}
//funny how blogspot lower-cases and closes  "tags" :-)
//don't copy them to java, and change long to Long in angle brackets in the code.
Do you have any comments, suggestions, questions? Don't hesitate to comment here, there or by e-mail.