Kafka Message Aggregation using Camel and Spring Boot

Posted by {"name"=>"Palash Ray", "email"=>"paawak@gmail.com", "url"=>"https://www.linkedin.com/in/palash-ray/"} on January 30, 2020 · 17 mins read

Introduction


Apache Camel is a popular open source integration framework that can work with almost any message brokers like Kafka, ActiveMQ, RabbitMQ etc. It provides out of the box support for the most popular EIPs (Enterprise Integration Patterns). Camel can also work seamlessly with Spring Boot, and that makes it a killer combination. In this example, we will see how to use the Aggregate EIP provided by Camel to do message aggregation on Kafka.


Problem Statement


We are building a microservice. It reads BankDetail messages in JSON format from the Kafka Topic bank-details. A BankDetail message has the below attributes:


	private int id;
	private int age;
	private String job;
	private String marital;
	private String education;
	private String defaulted;
	private BigDecimal balance;
	private String housing;
	private String loan;
	private String contact;
	private int day;
	private String month;
	private int duration;
	private int campaign;
	private int pdays;
	private int previous;
	private String poutcome;
	private String y;


BankDetail messages come in batches, each message of the same batch, has the same kafka.key. The microservice then aggregates all the messages of the same batch based on the job and finds out the count of various job categories. It would then publish the result of the aggregation on the Kafka Topic bank-details-aggregated. A typical aggregate message would look like:


{
	"adminCount": 478,
	"blueCollarCount": 946,
	"entrepreneurCount": 168,
	"houseMaidCount": 112,
	"managementCount": 969,
	"retiredCount": 230,
	"selfEmployedCount": 183,
	"servicesCount": 417,
	"studentCount": 84,
	"technicianCount": 768,
	"unemployedCount": 128,
	"unknownCount": 38
}


Solution


Project Setup using Maven


Its a standard Spring Boot project. We will define the Camel BOM as below:


	
		
			
			
				org.apache.camel
				camel-spring-boot-dependencies
				${spring.camel-version}
				pom
				import
			
		
	


Then we will define the dependencies for Camel:


		
		
			org.apache.camel
			camel-spring-boot-starter
		
		
			org.apache.camel
			camel-stream-starter
		
		
			org.apache.camel
			camel-kafka
		
		
			org.apache.camel
			camel-kafka-starter
		
		
			org.apache.camel
			camel-jackson-starter
		
		


Contract with the message publisher


We have the below contract with the BankDetail message publisher:


1. The messages will be published in JSON format on the Kafka Topic bank-details.


2. The kafka.key of all messages of the same group or batch would be identical.


3. The message will have a header named __TypeId__ that will have its fully qualified Java class name. This feature comes out of the box with Spring.


4. There is no expectation as to the type of the key, as long as the keys are fairly unique across different batches. In this example, I am using a random UUID.


5. After all messages in a batch are published, a CompletionSignal is published on Kafka.


6. A simple BankDetail publisher that respects the above contract can be found here.


Implementation details


Camel Route


A Route defines a logical message routing. We define a Route by extending a RouteBuilder. And it can be defined as a Spring Bean:


@Service
public class BankDetailAggregatorByJob extends RouteBuilder {
	@Override
	public void configure() {
		...
	}
}


Inside the configure() method, we define how we will process the incoming messages. In this case, we will read the messages off a Kafka Topic, aggregate those based on some condition and then publish the aggregated message back onto a Kafka Topic.


Reading messages from Kafka


The below line reads messages off the Kafka Topic bank-details:


from("kafka:bank-details?brokers=" + kafkaBrokers + "&autoOffsetReset=earliest"
		+ "&autoCommitEnable=true" + "&groupId=bank-detail-camel-consumer")


Define a Route ID


Its always a good practice to define a RouteID, its helps debugging.


  .routeId(BankDetailAggregatorByJob.class.getSimpleName()) 


Logging


Camel supports logging out of the box and also takes in a Slf4J Logger. Here LOG is a Slf4j Logger. ${header[key]} will print the message header with the key. ${headers} will print all the message headers and ${body} will print the message body. These are very useful short hands for debugging. Here is the full list of expressions supported by Camel.


               .log(LoggingLevel.TRACE, LOG,
               				"${header[" + RouteConstants.TYPE_HEADER + "]}")


Conditional branching using choice expression


In our Route, we can expect the below 2 types of messages:


1. BankDetail: This is the message that has to be aggregated


2. CompletionSignal: This message signifies the end of the batch, essentially, the end of aggregation operation


Based on the type of message, as defined by the __TypeId__ header, we would have to do different things. If the message is of the type BankDetail, we would convert this to a JobCount, which is the aggregate message. If the message is of the type CompletionSignal, we would send a signal to the aggregation framework to complete the aggregation operation for the current batch. We would do that by setting a Boolean flag. The complete choice code is shown below:


               			.choice()
               			.when(simple("${header." + RouteConstants.TYPE_HEADER + "} == '"
               				+ BankDetail.class.getName() + "'"))
               			.unmarshal().json(JsonLibrary.Jackson, BankDetail.class)
               			.process(exchange -> {
               			    BankDetail bankDetail = exchange.getIn().getBody(BankDetail.class);
               			    exchange.getIn().setBody(toJobCount(bankDetail), JobCount.class);
               			}).when(simple("${header." + RouteConstants.TYPE_HEADER + "} == '"
               				+ CompletionSignal.class.getName() + "'"))
               			.process(exchange -> {
               			    exchange.getIn().setBody(new JobCount(), JobCount.class);
               			    exchange.getIn().setHeader(
               				    RouteConstants.COMPLETE_JOB_AGGREGATION_COMMAND, Boolean.TRUE);
               			}).end()


Note that the message arrives as a JSON formatted string. In order to convert that to a Java object, we need to apply the unmarshal() transform as shown below:


 .unmarshal().json(JsonLibrary.Jackson, BankDetail.class) 


Aggregation Implementation


In its simplest form, the aggregate() function takes in 2 params:


1. The expression, based on which the messages are grouped together for aggregation. In the present case, messages are aggregated by the KafkaKey header.


2. The AggregationStrategy: Defines how 2 messages can be merged into a single message: more on it later.


.aggregate(header(KafkaConstants.KEY), new BankDetailAggregationStrategy()) 


We can also specify the completionTimeout. This defines the max time this aggregation operation can wait for the next message to arrive, starting from the time the last message was received. In essence, if a message does not arrive within the stipulated completionTimeout of the last message in the same aggregation group, the aggregation operation would be deemed to have timed out. We can also specify if we want to discard the partially aggregated message in case of time out.


 .completionTimeout(2_000).discardOnCompletionTimeout() 


Aggregation Strategy


The AggregationStrategy is the place where incoming messages are combined to form a single message. It defines a single method:


public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 


The oldExchange holds the partial aggregate results, while the newExchange holds the new incoming message. We would need to compute the next partial result and return it. Note that for the first message, the oldExchange would be null.


                   @Override
                   public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
               	if (oldExchange == null) {
               	    LOG.info("########### First message recieved for the correlationID: {}",
               		    getCorrelationId(newExchange));
               	    return newExchange;
               	}
               	JobCount partialResults = oldExchange.getIn().getBody(JobCount.class);
               	JobCount newMessage = newExchange.getIn().getBody(JobCount.class);
               	oldExchange.getIn().setBody(doAggregation(newMessage, partialResults), JobCount.class);
               	// any header that is used by the predicate should be set in the message
               	// returned from this method
               	Map headers = new HashMap<>();
               	headers.putAll(oldExchange.getIn().getHeaders());
               	headers.putAll(newExchange.getIn().getHeaders());
               	oldExchange.getIn().setHeaders(headers);
               	return oldExchange;
                   }


Apart from computing the aggregate result, it can also do some other interesting things as defined below.


Completing the Aggregation


If we implement the Predicate, we could indicate when the aggregation operation is deemed complete. In the present case, we wait till we receive a TRUE signal in a given header.


                   @Override
                   public boolean matches(Exchange oldExchange) {
               	Boolean completionCommand = oldExchange.getIn()
               		.getHeader(RouteConstants.COMPLETE_JOB_AGGREGATION_COMMAND, Boolean.class);
               	if (completionCommand == null) {
               	    return false;
               	}
               	LOG.debug("******shouldComplete: {}", completionCommand);
               	return completionCommand;
                   }


Aggregation Complete signal


If we implement the CompletionAwareAggregationStrategy, we would get a callback when the aggregation operation completes successfully.


                   @Override
                   public void onCompletion(Exchange exchange) {
               	String correlationId = getCorrelationId(exchange);
               	LOG.info("########### Aggregation is complete for {}", correlationId);
                   }


Aggregation Timeout signal


If we implement the TimeoutAwareAggregationStrategy, we would get a callback if the aggregation operation times out.


                   @Override
                   public void timeout(Exchange oldExchange, int index, int total, long timeout) {
               	LOG.info("############# timed out");
                   }


Converting message to JSON


Before we publish the aggregate message to the bus, we would need to convert the Java POJO into JSON.


  .marshal().json(JsonLibrary.Jackson) 


Publishing message to Kafka


This is how we would publish the aggregated message to the bank-details-aggregated Kafka Topic:


 .to("kafka:bank-details-aggregated?brokers=" + kafkaBrokers) 


Putting it together


Assuming that the kafka-simple-publisher is running on localhost:8090, to trigger BankDetail messages on Kafka Topic bank-detail, do:


curl "http://localhost:8090/kafka/publish"


Wait for around 50 seconds, and then you would see the aggregated message published on the bank-details-aggregated topic.


Source Code


The completely working source code can be found here:


https://github.com/paawak/spring-boot-demo/tree/master/kafka-spring/kafka-camel-aggregation-simple


Running the Demo


As Simple Java App


This is a Spring Boot application, so running it locally as a Java Application is just about running the main class KafkaCamelDemoApplication.


As Docker Image


This has also been dockerised using the io.fabric8 Docker Maven plugin.


In pom.xml:


           			
           				io.fabric8
           				docker-maven-plugin
           				0.32.0
           				
           					
           						
           							docker.io/paawak/${project.artifactId}
           							
           								${basedir}
           								Dockerfile
           								
           									${project.version}
           									latest
           								
           							
           						
           					
           				
           				
           					
           						build-docker-image
           						package
           						
           							build
           						
           					
           					
           						push-docker-image
           						install
           						
           							push
           						
           					
           				
           			
           


Build docker image


mvn clean package


Run docker image


docker run -it -p 8080:8080 -e server.port=8080 -e spring.profiles.active=default paawak/kafka-camel-aggregation-simple:latest


As cucumber test harness


You can also find a Cucumber based test harness here:


https://github.com/paawak/spring-boot-demo/tree/master/kafka-spring/message-aggregation-cucumber-test-harness


However, as a pre-requisite, you would need to download and build a docker image of the kafka-simple-publisher from here:


https://github.com/paawak/spring-boot-demo/tree/master/kafka-spring/kafka-simple-publisher


The feature file for our scenario is the message-aggregation-simple.feature.


Run the docker-compose file as:


 docker-compose -f docker-compose-message-aggregation-simple.yml  up 


This will start the needed docker images. After the containers are up, run the RunCucumberTest.