Wednesday, September 11, 2013

The Apache Camel 2.12.0 has been released!

The Apache Camel 2.12.0 has been released! Yeh!

This is a special release to me mainly because I have help fixed few Jira Issues. :-P

Among these, I have helped ported the old quartz component into quartz2 using the new Quartz 2.0 API. As far as for Camel users concern, it should work just as old one, except it will now use the quartz2:// URL prefix instead. But the implementation uses Quartz2 library that's not backward compatible with old Quartz1.8, which is very old. In addition, the quartz2 component now has a new option deleteJob=false that will allow you to NOT delete the job created by Camel on shutdown, and it will also reuse existing job found with the same name in the Quartz scheduler if it exists during startup.

Obviously my contribution seems tiny compare to what the full release brings you, but it’s a start in helping out the project. I am glad the Camel folks accepted these patches and found it into their release.

Try out the latest Camel and see what you think.

Happing programming!

Monday, September 9, 2013

Exploring Apache Camel Core - Seda Component

The seda component in Apache Camel is very similar to the direct component that I’ve presented in previous blog, but in a asynchronous manner. To do this, it uses a java.util.concurrent.BlockingQueue as default implementation to queue up messages and disconnect from your main Route thread and then processing the messages in a separated thread. Because of this BlockingQueue, you need to be aware of the usage and config option.

One option needs to be aware of asynchronous processing is the it default to queue size is unbound, meaning it will grow as much as your memory allowed. To cap this, set size=1000. Let’s see an example.

package camelcoredemo;

import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.main.Main;
import java.io.*;

public class SedaDemoCamel extends Main {
    static Logger LOG = LoggerFactory.getLogger(SedaDemoCamel.class);
    public static void main(String[] args) throws Exception {
        SedaDemoCamel main = new SedaDemoCamel();
        main.enableHangupSupport();
        main.addRouteBuilder(createRouteBuilder1());
        main.addRouteBuilder(createRouteBuilder2());
        main.addRouteBuilder(createRouteBuilder3());
        main.run(args);
    }
    // The file poller route
    static RouteBuilder createRouteBuilder1() {
        return new RouteBuilder() {
            public void configure() {
                from("file://target/input?preMove=staging&move=.processed")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        CamelContext camelContext = msg.getContext();
                        ProducerTemplate producer = camelContext.createProducerTemplate();
                        String text = msg.getIn().getBody(String.class);
                        String fileName = (String)msg.getIn().getHeader("CamelFileName");
                        boolean specialFile = fileName.endsWith("_SPECIAL.dat");
                        if (specialFile)
                            producer.sendBody("seda:specialRoute", text);
                        else
                            producer.sendBody("seda:normalRoute", text);
                    }
                });
            }
        };
    }
    // The special file processing route
    static RouteBuilder createRouteBuilder2() {
        return new RouteBuilder() {
            public void configure() {
                from("seda:specialRoute")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        LOG.info("Processing special file: " + msg);
                    }
                });
            }
        };
    }
    // The normal file processing route
    static RouteBuilder createRouteBuilder3() {
        return new RouteBuilder() {
            public void configure() {
                from("seda:normalRoute")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        LOG.info("Processing normal file: " + msg);
                    }
                });
            }
        };
    }
}

You will notice that this demo code is very similar to the direct component demo, with few differences. First, we use seda endpoints. Second, in file poller, we read in the entire file content text. We do this because we are now passing to an asynchronous Route that will runs on separate threads. The poller is configured to move the processed file into different folder right after the first Route has ended. So we must ensure the processing Route is not depended on the path of the File, hence we will load entire text in instead.

Another interesting seda option is you may set the number of concurrent threads to receive the messages to process them! Let’s say if your normal files are heavy in traffic, then you can configure to use more threads on that part (default is just one thread.)

from("seda:normalRoute?concurrentConsumers=10")
.process(new Processor() {
    public void process(Exchange msg) {
        LOG.info("Processing normal file: " + msg);
    }
});

To verify that your are running concurrently, you can easily configure your logger to display thread name. For example with log4j, you can use this pattern:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p %t [%c] - %m%n

There are more options availabe from Seda component that you may explore. Try it out with a Route and see it for yourself.

Sunday, September 8, 2013

Exploring Apache Camel Core - Direct Component

The Apache Camel allows you to create multiple Route's within a single CamelContext space. The direct component in Camel would allow you to bridge messages between these Route's. To demonstrate this, I will create few routes and pass messages between them.

package camelcoredemo;

import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.main.Main;
import java.io.*;

public class DirectDemoCamel extends Main {
    static Logger LOG = LoggerFactory.getLogger(DirectDemoCamel.class);
    public static void main(String[] args) throws Exception {
        DirectDemoCamel main = new DirectDemoCamel();
        main.enableHangupSupport();
        main.addRouteBuilder(createRouteBuilder1());
        main.addRouteBuilder(createRouteBuilder2());
        main.addRouteBuilder(createRouteBuilder3());
        main.run(args);
    }
    // The file poller route
    static RouteBuilder createRouteBuilder1() {
        return new RouteBuilder() {
            public void configure() {
                from("file://target/input?preMove=staging&move=.processed")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        CamelContext camelContext = msg.getContext();
                        ProducerTemplate producer = camelContext.createProducerTemplate();
                        File file = msg.getIn().getBody(File.class);
                        boolean specialFile = file.getName().endsWith("_SPECIAL.dat");
                        if (specialFile)
                            producer.send("direct:specialRoute", msg);
                        else
                            producer.send("direct:normalRoute", msg);
                    }
                });
            }
        };
    }
    // The special file processing route
    static RouteBuilder createRouteBuilder2() {
        return new RouteBuilder() {
            public void configure() {
                from("direct:specialRoute")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        LOG.info("Processing special file: " + msg);
                    }
                });
            }
        };
    }
    // The normal file processing route
    static RouteBuilder createRouteBuilder3() {
        return new RouteBuilder() {
            public void configure() {
                from("direct:normalRoute")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        LOG.info("Processing normal file: " + msg);
                    }
                });
            }
        };
    }
}

Here I have created 3 Route's and re-used the file component I have introduced in the past. The first Route polls a directory, and then based on the name of the file found, we send it to either to special or normal Route for processing. Because these Route's are separated, we need a bridge channel to pass the messages through, hence it's what the direct component does. The usage is simply use any unique name within the CamelContext, and it will serve as a direct memory queue to pass messages. You may read from or send to these queues. So as you can see, the direct component let you easily breakup a complex route workflow into smaller part.

In above demo, I have also introduced a bit of Camel core features: ProducerTemplate. Within a CamelContext you may create an instance of ProducerTemplate and it will allow you to send any messages to any endpoints dynamically at runtime. Usually you would probably want to store this producer object as member field instead of per each message processing. But for demo purpose, I will leave it as simple as that, and leave you as exercise to explore more on your own.

There are more options availabe from Direct component that you may explore. Try it out with a Route and see it for yourself.

Thursday, September 5, 2013

Exploring Apache Camel Core - File Component

A file poller is a very useful mechanism to solve common IT problems. Camel’s built-in file component is extremely flexible, and there are many options available for configuration. Let’s cover few common usages here.

Polling a directory for input files

Here is a typical Camel Route used to poll a directory for input files on every second.

import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import java.io.*;

public class FileRouteBuilder extends RouteBuilder {
    static Logger LOG = LoggerFactory.getLogger(FileRouteBuilder.class);
    public void configure() {
        from("file://target/input?delay=1000")
        .process(new Processor() {
            public void process(Exchange msg) {
                File file = msg.getIn().getBody(File.class);
                LOG.info("Processing file: " + file);
            }
        });
    }
}

Run this with following

mvn compile exec:java -Dexec.mainClass=org.apache.camel.main.Main -Dexec.args='-r camelcoredemo.FileRouteBuilder'

The program will begin to poll your target/input folder under your current directory, and wait for incoming files. To test with input files, you would need to open another terminal, and then create some files as follow.

echo 'Hello 1' > target/input/test1.txt
echo 'Hello 2' > target/input/test2.txt

You should now see the first prompt window start to picking up the files and pass to the next Processor step. In the Processor, we obtain the File object from the message body. It then simply logs it’s file name. You may hit CTRL+C when you are done.

There many configurable options from file componet you may use in the URL, but most of the default settings are enough to get you going as simple case above. Some of these default behavior is such that if the input folder doesn’t exists, it will create it. And when the file is done processing by the Route, it will be moved into a .camel folder. If you don’t want the file at all after processing, then set delete=true in the URL.

Read in the file content and converting to different types

By default, the file component will create a org.apache.camel.component.file.GenericFile object for each file found and pass it down your Route as message body. You may retrieve all your file information through this object. Or alternatively, you may also use the Exchange API to auto convert the message body object to a type you expect to receive (eg: as with msg.getIn().getBody(File.class)). In above example, the File is a type you expect to get from the message body, and Camel hence will try to convert it for you. The Camel uses the context’s registry space to pre-registered many TypeConverter's that can handle most of the common data types (like Java primative etc) conversion. These TypeConverters are powerful way to make your Route and Processor more flexbile and portable.

Camel will not only convert just your File object from message body, but it can also read the file content. If your files are character text based, then you can simply do this.

        from("file://target/input?charset=UTF-8")
        .process(new Processor() {
            public void process(Exchange msg) {
                String text = msg.getIn().getBody(String.class);
                LOG.info("Processing text: " + text);
            }
        });

That’s it! Simply specify String type, and Camel will read your file in and pass the entire file text content as body message. You may even use the charset to change the encoding.

If you are dealing with binary file, then simply try byte[] bytes = msg.getIn().getBody(byte[].class); conversion instead. Pretty cool huh?

Polling and processing large files

When working with large files, there few options in file componet that you might want to use to ensure proper handling. For example, you might want to move the input file into a staging folder before the Route starts the processing; and when it’s done, move it to a .completed folder.

        from("file://target/input?preMove=staging&move=.completed")
        .process(new Processor() {
            public void process(Exchange msg) {
                File file = msg.getIn().getBody(File.class);
                LOG.info("Processing file: " + file);
            }
        });

To feed input files properly into the polling folder, it’s best if the sender generates the input files in a temporary folder first, and only when it’s ready then move it into the polling folder. This will minimize reading an incomplete file by the Route if the input file might take times to generate. Also another solution to this is to config file endpoint to only read the polling folder when there is a signal or ready marker file exists. For example:

        from("file://target/input?preMove=staging&move=.completed&doneFileName=ReadyFile.txt")
        .process(new Processor() {
            public void process(Exchange msg) {
                File file = msg.getIn().getBody(File.class);
                LOG.info("Processing file: " + file);
            }
        });

Above will only read the target/input folder when there is a ReadyFile.txt file exists. The marker file can be just an empty file, and it will be removed by Camel after polling. This solution would allow the sender to generate input files in however long time it might take.

Another concern with large file processing is to avoid loading entire file content into memory for processing. To be more practical, you want to split the file into records (eg: per line) and process it one by one (or called "streaming"). Here is how you would do that using Camel.

        from("file://target/input?preMove=staging&move=.completed")
        .split(body().tokenize("\n"))
        .streaming()
        .process(new Processor() {
            public void process(Exchange msg) {
                String line = msg.getIn().getBody(String.class);
                LOG.info("Processing line: " + line);
            }
        });

This Route will allow you to process large size file without cosuming too much memory and process it line by line very efficiently.

Writing messages back into file

The file component can also be used to write messages into files. Recall that we may use dataset component to generate sample messages. We will use that to feed the Route and send to the file component so you can see that each message generated will be saved into a file.

package camelcoredemo;

import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.main.Main;
import org.apache.camel.component.dataset.*;

public class FileDemoCamel extends Main {
    static Logger LOG = LoggerFactory.getLogger(FileDemoCamel.class);
    public static void main(String[] args) throws Exception {
        FileDemoCamel main = new FileDemoCamel();
        main.enableHangupSupport();
        main.addRouteBuilder(createRouteBuilder());
        main.bind("sampleGenerator", createDataSet());
        main.run(args);
    }
    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                from("dataset://sampleGenerator")
                .to("file://target/output");
            }
        };
    }
    static DataSet createDataSet() {
        return new SimpleDataSet();
    }
}

Compile and run it

mvn compile exec:java -Dexec.mainClass=camelcoredemo.FileDemoCamel

Upon complete you will see that 10 files would be generated in target/output folder with file name in ID-<hostname>-<unique-number>-<msg-seq-num> format.

There are more options availabe from File component that you may explore. Try it out with a Route and see it for yourself.

Wednesday, September 4, 2013

Exploring Apache Camel Core - DataSet Component

A good sample data generator can help you test program more throughly and help measure the processing throughput. The camel-core comes with a dataset component that can help you do this easily. All you need is to provide a bean that implements org.apache.camel.component.dataset.DataSet interface and bind it in CamelContext registry. Here is an example:

package camelcoredemo;

import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.main.Main;
import org.apache.camel.component.dataset.*;

public class DataSetDemoCamel extends Main {
    static Logger LOG = LoggerFactory.getLogger(DataSetDemoCamel.class);
    public static void main(String[] args) throws Exception {
        DataSetDemoCamel main = new DataSetDemoCamel();
        main.enableHangupSupport();
        main.addRouteBuilder(createRouteBuilder());
        main.bind("sampleGenerator", createDataSet());
        main.run(args);
    }
    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                from("dataset://sampleGenerator")
                .to("log://demo");
            }
        };
    }
    static DataSet createDataSet() {
        return new SimpleDataSet();
    }
}

Compile and run it.

mvn compile exec:java -Dexec.mainClass=camelcoredemo.DataSetDemoCamel

In here we have used the built-in org.apache.camel.component.dataset.SimpleDataSet implementation, which by default will generate 10 messages with a text body set to <hello>world!</hello>. You may easily change the value, or even provide your own implementation starting with org.apache.camel.component.dataset.DataSetSupport base class to customize your data set.

Use DataSet Component to measure throughput

One useful feature of dataset component I found is using it to load test your Route. To do this, you have to adjust couple settings though. Let’s say if I want to load a large text file as sample input data and feed it to the Route, and then measure its throughout.

    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                from("dataset://sampleGenerator?produceDelay=0")
                .to("log://demo?groupSize=100");
            }
        };
    }
    static DataSet createDataSet() {
        SimpleDataSet result = new SimpleDataSet();
        result.setSize(500);
        result.setDefaultBody(readFileToString("my-large-sample.txt");
        return result;
    }

Replace above in the Main class and you will notice that it will pump 500 messages into the Route, and it samples every 100 messages and display its throught rates. I have to add produceDelay=0 option so the generator so it will not pause between messages. Then I have added groupSize=100 option to log component for throughput measurement. I skipped readFileToString(String) demo code since I assume you can easily figured that out on your own. (Hint: checkout Apache commons-io library.)

There is another side of dataset component that you may use, and that is to receive and verify message content. You would simply use the same URL in a to(url) line. Internally Camel would assert your message body against your original.

There are more options availabe from DataSet component that you may explore. Try it out with a Route and see it for yourself.

Tuesday, September 3, 2013

Exploring Apache Camel Core - Log Component

There are many ways to log and inspect the messages as it pass through your Camel Route. The camel-core comes with a log component that let you inspect the message. So instead of write a separate Processor just to log a line as got processed, try using this:

from("timer://timer1?period=1s")
.to("log:demo")

By default, the log component will record your message body content through your logger name, demo in above case, at INFO level. Since you can give any name, you can control the logging LEVEL anyway you like through a Camel SLF4J logger implementation.

To log the message in DEBUG level, try this

from("timer://timer1?period=1s")
.to("log:demo?level=DEBUG")

Now if you use log4j as logger implementation, then ensure to add a logger config like this.

log4j.logger.demo = DEBUG
log4j.logger.org.apache.camel = INFO

The Camel message may have Properties and Headers as well, so to display these, you may add showAll=true.

When you process messages that have large body text, it might be more practical to just dislay certain number of characters. To do this, add maxChars=256 to URL.

How to measure Camel messages throughput rate

One of the hidden gem of the log componet is its ability to log messages throughput! You may specific group of messages to be logged, and once it reached that count, it will print the msgs/sec rate output. To enable this, just add groupSize option to URL.

To demo this, I will create a SampleGenerator bean processor that would flood the Route with sample messages. I will use the Camel context registry to bind the bean, and then reference it in the Route. Here is the demo code.

package camelcoredemo;

import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.main.Main;

public class LogDemoCamel extends Main {
    static Logger LOG = LoggerFactory.getLogger(LogDemoCamel.class);
    public static void main(String[] args) throws Exception {
        LogDemoCamel main = new LogDemoCamel();
        main.enableHangupSupport();
        main.addRouteBuilder(createRouteBuilder());
        main.bind("sampleGenerator", new SampleGenerator());
        main.run(args);
    }
    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                from("bean:sampleGenerator")
                .to("log://demo?groupSize=100");
            }
        };
    }
    static class SampleGenerator implements Processor{
        int count = 0;
        public void process(Exchange msg) throws Exception {
            if (count >= 500){
                LOG.info("Max count has reached. Do nothing.");
                Thread.sleep(Long.MAX_VALUE);
                return;
            }

            // Let's generate sample message.
            count++;
            LOG.trace("Generating sample msg #{}", count);
            msg.getOut().setBody("Sample msg");
        }
    }
}

Now you should able to compile and run this demo.

mvn compile exec:java -Dexec.mainClass=camelcoredemo.LogDemoCamel

When running this demo, you will notice the rate will be displayed on console and how fast you can pump message to Route and to process it. This is a very useful feature to help you measure and have a quick view on your Route's capability.

There are more options availabe from Log component that you may explore. Try it out with a Route and see it for yourself.

Monday, September 2, 2013

Exploring Apache Camel Core - Timer Component

Camel Timer is a simple and yet useful component. It brings the JDK’s timer functionality into your camel Route with very simple config.

   from("timer://mytimer?period=1000")
    .process(new Processor() {
        public void process(Exchange msg) {
            LOG.info("Processing {}", msg);
        }
    });

That will generate a timer event message every second. You may short hand 1000 with 1s instead. It supports m for minutes, or h for hours as well. Pretty handy.

Another useful timer feature is that it can limit (stop) the number of timer messages after a certain count. You simply need to add repeatCount option toward the url.

Couple of properties from the event message would be useful when handling the timer message. Here is an example how to read them.

   from("timer://mytimer?period=1s&repeatCount=5")
    .process(new Processor() {
        public void process(Exchange msg) {
            java.util.Date fireTime = msg.getProperty(Exchange.TIMER_FIRED_TIME, java.util.Date.class);
            int eventCount = msg.getProperty(Exchange.TIMER_COUNTER, Integer.class);
            LOG.info("We received {}th timer event that was fired on {}", eventCount, fireTime);
        }
    });

There are more options availabe from Timer component that you may explore. Try it out with a Route and see it for yourself.