Topics

Multiple Keys/Value pair with Custom Filters in Apache Kafka

Download

Introduction:

Before I even start talking about Apache Kafka here, is it possible to use list of keys in Kafka? Yes, It’s all possible.

In this post, we’ll see how to use multiple keys/value pairs with multiple filters. First create a simple Kafka producer and a Kafka consumer in a Spring Boot application using a very simple method.

Apache Kafka and its Architecture:

Kafka is a distributed streaming platform that is used publish and subscribe to streams of records.

  • Kafka maintains feeds of messages in categories called topics.
  • Processes that publish messages to a Kafka topic are called producers.
  • Processes that subscribe to topics and process the feed of published messages are called consumers.
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

Producer —> Kafka Cluster (Broker & Zookeeper) —> Consumer

Kafka Setup:

Follow the step-by-step instruction from the website, and you’ll get Kafka up and running in your local environment.

Create a new Spring Boot Application:

In this example, I’m working with a Spring-Boot application using gradle project in eclipse IDE. You can use Spring initializr to generate our projects and unzip it in your local.

Dependencies:

These are dependencies for kafka add in your build.gradle file.Dependencies

Project Structure:

Project Structure

1.Spring Boot Controller for Kafka:

Create a controller class, it has two endpoints one is for message would be String type and the second one is for Json type. The KafkaController class will expose two endpoints, using which we will send message through Postman → then it will go to Producer, which will publish it to Kafka queue → and then our Consumer will catch it, and handle the way we set up — just log to the console.

In this controller, we are passing list of keys with value (Multiple keys with single value) as a request param and finally added topic name from the property file.

Controller

2.The Kafka Producer:

2.1. Producer Configuration:

To enable the kafka producer kafkaTemplateString() and producerFactoryString() methods should be implemented in KafkaConfiguration class. The kafkaTemplate() will return a new kafkaTemplate based on the configuration defined in producerFactoryString(). Here we autowired KafkaTemplate, and we will use this instance to publish messages to the queue.

2.1

Here I have used two input types. The configuration steps are almost same, we need to change only in the value serializer as JsonSerializer instead of StringSerializer and change the ProducerFactory type as your pojo name as value.

Create a service class, it has contained implementation for both String and Json type inputs comes from the controller layer. Here I have looped the keys, because producer at a time it will push only single message to the queue.

Send Message

That’s it for producer! Let’s move to Consumer — the service that will be responsible for catching messages and further handling(based on your own logic).

3.The Kafka Consumer:

3.1.Custom Filter:

Here I have added a custom filter for key from the queue. We are passing multiple keys with a single message from producer. In consumer it will retrieve a specific message depending on a key using KafkaListener.

Recordstrategy

In my case I have negotiate the recordFilterStrategy condition because I have returned the value/message based on a specific key. You can change the condition accordingly.

3.2.Consumer Configuration:

Implement consumerFactoryWithFilter () and stringFilterKafkaListenerContainerFactory () methods in KafkaConfiguration class where both methods are used to enable kafka custom filter. In order to set the RecordFilterStrategy (message filtering strategy) listening capabilities in kafka listener. When it returns true, the message will be discarded. When it returns false, the message can normally reach the listening container.

Config

The configuration steps are almost same we need to change only in the value serializer as JsonDeserializer instead of StringDeserializer and change the ConsumerFactory type as your pojo name as value.

Consumer will pull the message from the queue based on the consume configuration. The method that is intended to consume the message should be annotated with KafkaListener and pass the topic, groupId and the most important field as containerFactory as a parameter.

The ContainerFactory will check with consumer configuration in kafkaconfiguration class where methods are used  to set filter for the given message or not.

Listener

In this example I have explained with and without filter call using multiple listeners for both the input types. If you need filter you can set the record filter strategy or else go with the flow.

4.Testing API with Postman:

Post: http://localhost:9000/kafka/publish?keys=Key1, Key2, Key3&message=TestMessage

String with Filter:

String Filter

String without Filter:

Without Filter

Json with filter:

POST: https://localhost:9000/kafka/publishTransaction?keys=Key1, Key2, Key3

Postman

Sample request:

{

“name”:”TestMessage”,

“type”: “json”

}

Output:

Json Filter

Use these commands to start zookeeper and Kafka server.

Hope you like our explanation. Please look into my GitHub repo for the full implementation of the applications and feel free to contribute to the repo.

 

 

 

 

 

About the Author

More from this Author

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Subscribe to the Weekly Blog Digest:

Sign Up