Introduction:
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot and uses Spring Integration to provide connectivity to message brokers. It provides configuration of middleware, introducing the concepts of publish-subscribe, consumer groups, and partitions.
Example:
To use SCF (Spring Cloud Function) with in SCS(Spring Cloud Stream) we need to add these two dependencies in the initializer.
Now we can create a producer, a consumer or a processor using the java functional interfaces.
A Consumer can be written using consumer interface.
@Bean public Consumer<String> myConsumer() { return input -> log.info( " Consumed message : "+ input); }
A producer can be written with the supplier, but you need a bridge for this.
public static EmitterProcessor<Message<Address>> producer = EmitterProcessor.create(); @Bean public Supplier<Flux<Message<Address>>> produce() { return () -> producer; }
A processor which is receiving from one topic and writing into another topic can be written with a function.
@Bean public Function<String, String> address() { return msg -> { return " Some Message "; }; }
By default, SCS connects to localhost 9092, we can change this with a property. Also we can add more than one port, comma separated.
For every Consumer, we should define a specific topic name and groupId. Kafka will send a message only once per consumer group. Means, if we start x instances of our service and define a groupId. the message from Kafka will be send to one of the instance only.
spring.cloud.stream.bindings.topicName.group=groupName
Error Handling:
SCS perform retries by default, which can lead to duplicate processing of requests. Errors are handled differently for producers and consumers.
For Consumer, we can avoid retries by setting below property to 1.
spring.cloud.stream.bindings.topicName.consumer.max-attempts=1
Another way is to catch the exception and handle it gracefully.
A Producer will try to send a message to Kafka, if it is not successful it will retry. if the retry is not successful it will send this error to error channel.
Partitions:
Kafka topics are divided into a number of partitions. Partitions allow you to parallelize a topic by splitting the data in a particular topic across multiple brokers. Imagine a cluster of 3 brokers and 5 partitions for your topic. In this case, 2 brokers will have 2 partitions each, 1 broker will have 1 partition. A partition guaranties, that the message will be delivered in the order they were received. Consumers can also be parallelized so that multiple consumers can read from multiple partitions in a topic allowing for very high message processing throughput.
A partition only allows one consumer to read messages from particular consumer group. If a consumer dies or joins consumer group then we can rebalance the partition and reassign to another consumer.
Conclusion:
Using SCS with Kafka in our architecture we can achieve a high level of parallelism and decoupling between data producers and data consumers, making our architecture more flexible and adaptable to change.
This blog is informative and has code to create create producer, consumer or processor using the java functional interfaces.