Apache Camel is a popular open source integration framework that can work with almost any message brokers like Kafka, ActiveMQ, RabbitMQ etc. It provides out of the box support for the most popular EIPs (Enterprise Integration Patterns). Camel can also work seamlessly with Spring Boot, and that makes it a killer combination. In this example, we will see how to use the Aggregate EIP provided by Camel to do message aggregation on Kafka.
We are building a microservice. It reads BankDetail messages in JSON format from the Kafka Topic bank-details. A BankDetail message has the below attributes:
private int id; private int age; private String job; private String marital; private String education; private String defaulted; private BigDecimal balance; private String housing; private String loan; private String contact; private int day; private String month; private int duration; private int campaign; private int pdays; private int previous; private String poutcome; private String y;
BankDetail messages come in batches, each message of the same batch, has the same kafka.key. The microservice then aggregates all the messages of the same batch based on the job and finds out the count of various job categories. It would then publish the result of the aggregation on the Kafka Topic bank-details-aggregated. A typical aggregate message would look like:
{ "adminCount": 478, "blueCollarCount": 946, "entrepreneurCount": 168, "houseMaidCount": 112, "managementCount": 969, "retiredCount": 230, "selfEmployedCount": 183, "servicesCount": 417, "studentCount": 84, "technicianCount": 768, "unemployedCount": 128, "unknownCount": 38 }
Its a standard Spring Boot project. We will define the Camel BOM as below:
org.apache.camel camel-spring-boot-dependencies ${spring.camel-version} pom import
Then we will define the dependencies for Camel:
org.apache.camel camel-spring-boot-starter org.apache.camel camel-stream-starter org.apache.camel camel-kafka org.apache.camel camel-kafka-starter org.apache.camel camel-jackson-starter
We have the below contract with the BankDetail message publisher:
1. The messages will be published in JSON format on the Kafka Topic bank-details.
2. The kafka.key of all messages of the same group or batch would be identical.
3. The message will have a header named __TypeId__ that will have its fully qualified Java class name. This feature comes out of the box with Spring.
4. There is no expectation as to the type of the key, as long as the keys are fairly unique across different batches. In this example, I am using a random UUID.
5. After all messages in a batch are published, a CompletionSignal is published on Kafka.
6. A simple BankDetail publisher that respects the above contract can be found here.
A Route defines a logical message routing. We define a Route by extending a RouteBuilder. And it can be defined as a Spring Bean:
@Service public class BankDetailAggregatorByJob extends RouteBuilder { @Override public void configure() { ... } }
Inside the configure() method, we define how we will process the incoming messages. In this case, we will read the messages off a Kafka Topic, aggregate those based on some condition and then publish the aggregated message back onto a Kafka Topic.
The below line reads messages off the Kafka Topic bank-details:
from("kafka:bank-details?brokers=" + kafkaBrokers + "&autoOffsetReset=earliest" + "&autoCommitEnable=true" + "&groupId=bank-detail-camel-consumer")
Its always a good practice to define a RouteID, its helps debugging.
.routeId(BankDetailAggregatorByJob.class.getSimpleName())
Camel supports logging out of the box and also takes in a Slf4J Logger. Here LOG is a Slf4j Logger. ${header[key]} will print the message header with the key. ${headers} will print all the message headers and ${body} will print the message body. These are very useful short hands for debugging. Here is the full list of expressions supported by Camel.
.log(LoggingLevel.TRACE, LOG, "${header[" + RouteConstants.TYPE_HEADER + "]}")
In our Route, we can expect the below 2 types of messages:
1. BankDetail: This is the message that has to be aggregated
2. CompletionSignal: This message signifies the end of the batch, essentially, the end of aggregation operation
Based on the type of message, as defined by the __TypeId__ header, we would have to do different things. If the message is of the type BankDetail, we would convert this to a JobCount, which is the aggregate message. If the message is of the type CompletionSignal, we would send a signal to the aggregation framework to complete the aggregation operation for the current batch. We would do that by setting a Boolean flag. The complete choice code is shown below:
.choice() .when(simple("${header." + RouteConstants.TYPE_HEADER + "} == '" + BankDetail.class.getName() + "'")) .unmarshal().json(JsonLibrary.Jackson, BankDetail.class) .process(exchange -> { BankDetail bankDetail = exchange.getIn().getBody(BankDetail.class); exchange.getIn().setBody(toJobCount(bankDetail), JobCount.class); }).when(simple("${header." + RouteConstants.TYPE_HEADER + "} == '" + CompletionSignal.class.getName() + "'")) .process(exchange -> { exchange.getIn().setBody(new JobCount(), JobCount.class); exchange.getIn().setHeader( RouteConstants.COMPLETE_JOB_AGGREGATION_COMMAND, Boolean.TRUE); }).end()
Note that the message arrives as a JSON formatted string. In order to convert that to a Java object, we need to apply the unmarshal() transform as shown below:
.unmarshal().json(JsonLibrary.Jackson, BankDetail.class)
In its simplest form, the aggregate() function takes in 2 params:
1. The expression, based on which the messages are grouped together for aggregation. In the present case, messages are aggregated by the KafkaKey header.
2. The AggregationStrategy: Defines how 2 messages can be merged into a single message: more on it later.
.aggregate(header(KafkaConstants.KEY), new BankDetailAggregationStrategy())
We can also specify the completionTimeout. This defines the max time this aggregation operation can wait for the next message to arrive, starting from the time the last message was received. In essence, if a message does not arrive within the stipulated completionTimeout of the last message in the same aggregation group, the aggregation operation would be deemed to have timed out. We can also specify if we want to discard the partially aggregated message in case of time out.
.completionTimeout(2_000).discardOnCompletionTimeout()
The AggregationStrategy is the place where incoming messages are combined to form a single message. It defines a single method:
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
The oldExchange holds the partial aggregate results, while the newExchange holds the new incoming message. We would need to compute the next partial result and return it. Note that for the first message, the oldExchange would be null.
@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { LOG.info("########### First message recieved for the correlationID: {}", getCorrelationId(newExchange)); return newExchange; } JobCount partialResults = oldExchange.getIn().getBody(JobCount.class); JobCount newMessage = newExchange.getIn().getBody(JobCount.class); oldExchange.getIn().setBody(doAggregation(newMessage, partialResults), JobCount.class); // any header that is used by the predicate should be set in the message // returned from this method Mapheaders = new HashMap<>(); headers.putAll(oldExchange.getIn().getHeaders()); headers.putAll(newExchange.getIn().getHeaders()); oldExchange.getIn().setHeaders(headers); return oldExchange; }
Apart from computing the aggregate result, it can also do some other interesting things as defined below.
If we implement the Predicate, we could indicate when the aggregation operation is deemed complete. In the present case, we wait till we receive a TRUE signal in a given header.
@Override public boolean matches(Exchange oldExchange) { Boolean completionCommand = oldExchange.getIn() .getHeader(RouteConstants.COMPLETE_JOB_AGGREGATION_COMMAND, Boolean.class); if (completionCommand == null) { return false; } LOG.debug("******shouldComplete: {}", completionCommand); return completionCommand; }
If we implement the CompletionAwareAggregationStrategy, we would get a callback when the aggregation operation completes successfully.
@Override public void onCompletion(Exchange exchange) { String correlationId = getCorrelationId(exchange); LOG.info("########### Aggregation is complete for {}", correlationId); }
If we implement the TimeoutAwareAggregationStrategy, we would get a callback if the aggregation operation times out.
@Override public void timeout(Exchange oldExchange, int index, int total, long timeout) { LOG.info("############# timed out"); }
Before we publish the aggregate message to the bus, we would need to convert the Java POJO into JSON.
.marshal().json(JsonLibrary.Jackson)
This is how we would publish the aggregated message to the bank-details-aggregated Kafka Topic:
.to("kafka:bank-details-aggregated?brokers=" + kafkaBrokers)
Assuming that the kafka-simple-publisher is running on localhost:8090, to trigger BankDetail messages on Kafka Topic bank-detail, do:
curl "http://localhost:8090/kafka/publish"
Wait for around 50 seconds, and then you would see the aggregated message published on the bank-details-aggregated topic.
The completely working source code can be found here:
https://github.com/paawak/spring-boot-demo/tree/master/kafka-spring/kafka-camel-aggregation-simple
This is a Spring Boot application, so running it locally as a Java Application is just about running the main class KafkaCamelDemoApplication.
This has also been dockerised using the io.fabric8 Docker Maven plugin.
In pom.xml:
io.fabric8 docker-maven-plugin 0.32.0 docker.io/paawak/${project.artifactId} ${basedir} Dockerfile ${project.version} latest build-docker-image package build push-docker-image install push
mvn clean package
docker run -it -p 8080:8080 -e server.port=8080 -e spring.profiles.active=default paawak/kafka-camel-aggregation-simple:latest
You can also find a Cucumber based test harness here:
However, as a pre-requisite, you would need to download and build a docker image of the kafka-simple-publisher from here:
https://github.com/paawak/spring-boot-demo/tree/master/kafka-spring/kafka-simple-publisher
The feature file for our scenario is the message-aggregation-simple.feature.
Run the docker-compose file as:
docker-compose -f docker-compose-message-aggregation-simple.yml up
This will start the needed docker images. After the containers are up, run the RunCucumberTest.