Batch Processing nowadays is a very common task performed by Organisations for processing large volumes of records for tasks including logging/tracing, transaction management, job processing statistics, and the list goes on. When you work with extensive data, there is no guarantee that inputs will be coming from another java-based application or even database. In fact, it is very likely to have to work with message buses and queues for batch processing. And here is when Spring Batch comes to the rescue!!
Spring Batch provides beneficial readers when working with messaging queues, namely, JmsItemReader and AmqpItemReader.
This article will be concentrating on batch processing and a messaging queue with relatively little effort using Spring Batch with Rabbit MQ.
What is RabbitMQ?
RabbitMQ is a lightweight, reliable, scalable, and portable message broker. But unlike many message brokers familiar to Java developers, it’s not based on JMS. Instead, your applications communicate with it via a platform-neutral, wire-level protocol: the Advanced Message Queuing Protocol (AMQP).
In the case of batch processing, this is especially useful — even if you are sending messages from another Java application, you can avoid the headaches that come from sending POJOs over the wire, requiring a consistent model between multiple applications. That is, sending and receiving strings is much more flexible.
Prerequisites
- RabbitMQ should be installed somewhere that is accessible to this project.
- Spring boot project using Spring Tool Suite.
Dependencies
In your POM, be sure to include among your dependencies the following artifacts:
<dependencies> ... <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> ... <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ... </dependencies>
Configuration
Since this is Spring Boot, you will need an entry point for the application. So, be sure to include a class such as the one below in src/main/java:
@SpringBootApplication public class MyBatchApplication { public static void main(String[] args) { SpringApplication.run(MyBatchApplication.class, args); } }
we need to set up an overall configuration for the project
@Configuration @EnableBatchProcessing public class MyBatchAppConfig { }
Notice the @EnableBatchProcessing annotation. This is mandatory to let Spring Boot know to take care of a lot of the heavy lifting when setting up Spring Batch.
Set up a message queue using Spring AMQP, which will also provide the template necessary to help interact with the queue.
import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; ... public final static String queueName = "my-queue"; @Autowired private ConnectionFactory rabbitConnectionFactory; // this will create a new queue if it doesn't exist; otherwise, it'll use the existing one of the same name // ...the second argument means to make the queue 'durable' @Bean public Queue myQueue() { return new Queue(queueName, true); } // this is necessary for operations with Spring AMQP @Bean public RabbitTemplate getMyQueueTemplate() { RabbitTemplate template = new RabbitTemplate(this.rabbitConnectionFactory); template.setQueue(queueName); return template;
Configuring a batch job’s Step, that is, what happens for each piece of data that’s consumed, assuming we’re using chunk-based processing and not a tasklet.
import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.amqp.AmqpItemReader; ... @Autowired private StepBuilderFactory stepBuilderFactory; ... @Bean public Step getMyJobStep() { return this.stepBuilderFactory.get("myJobStep") .<String, DomainObject>chunk(1) .reader(this.getMyReader()) .processor(this.getMyProcessor()) .writer(this.getMyWriter()) .build(); } @Bean public ItemReader<String> getMyReader() { return new AmqpItemReader<String>(this.getMyQueueTemplate()); } @Bean public ItemProcessor<String, DomainObject> getMyProcessor() { return new MessageProcessor(); } @Bean public ItemWriter<DomainObjecvt> getMyWriter() { return new MessageWriter(); }
The AmqpItemReader, which tells us that it is expecting a String to represent each message on the queue, takes the queue template we created earlier as an argument. The AmqpItemReader is a class that comes with Spring Batch out of the box — no other work is needed to get it to work! That’s the magic of Spring Batch!
Conclusion
We took an introductory look at how your Spring Batch project accepts input from an AMQP-based message queue with minimal setup.
And yes, this is it!! We are ready to roll.
If you enjoyed this and thought it was informative ,do share with your colleagues . Also let me know in the comment section on what should be the topic for my upcoming blog.