Skip to main content

Technical

Spring Batch with Rabbit MQ

Women Discussing Data

Batch Processing nowadays is a very common task performed by Organisations for processing large volumes of records for tasks including logging/tracing, transaction management, job processing statistics, and the list goes on. When you work with extensive data, there is no guarantee that inputs will be coming from another java-based application or even database. In fact, it is very likely to have to work with message buses and queues for batch processing. And here is when Spring Batch comes to the rescue!!

Spring Batch provides beneficial readers when working with messaging queues, namely, JmsItemReader and AmqpItemReader.

This article will be concentrating on batch processing and a messaging queue with relatively little effort using Spring Batch with Rabbit MQ.

What is RabbitMQ?

RabbitMQ is a lightweight, reliable, scalable, and portable message broker. But unlike many message brokers familiar to Java developers, it’s not based on JMS. Instead, your applications communicate with it via a platform-neutral, wire-level protocol: the Advanced Message Queuing Protocol (AMQP).

In the case of batch processing, this is especially useful — even if you are sending messages from another Java application, you can avoid the headaches that come from sending POJOs over the wire, requiring a consistent model between multiple applications.  That is, sending and receiving strings is much more flexible.

Prerequisites

  1. RabbitMQ should be installed somewhere that is accessible to this project.
  1. Spring boot project using Spring Tool Suite.

Dependencies

In your POM, be sure to include among your dependencies the following artifacts:

 

<dependencies>

...

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-batch</artifactId>

</dependency>

...

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

...

</dependencies>

Configuration

Since this is Spring Boot, you will need an entry point for the application.  So, be sure to include a class such as the one below in  src/main/java:

@SpringBootApplication

public class MyBatchApplication  {

public static void main(String[] args) {

SpringApplication.run(MyBatchApplication.class, args);

}

}

we need to set up an overall configuration for the project

@Configuration

@EnableBatchProcessing

public class MyBatchAppConfig {

}

Notice the @EnableBatchProcessing annotation. This is mandatory to let Spring Boot know to take care of a lot of the heavy lifting when setting up Spring Batch.

Set up a message queue using Spring AMQP, which will also provide the template necessary to help interact with the queue.

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

...

public final static String queueName = "my-queue";


@Autowired

private ConnectionFactory rabbitConnectionFactory;


// this will create a new queue if it doesn't exist; otherwise, it'll use the existing one of the same name

// ...the second argument means to make the queue 'durable'

@Bean

public Queue myQueue() {

return new Queue(queueName, true);

}


// this is necessary for operations with Spring AMQP

@Bean

public RabbitTemplate getMyQueueTemplate() {

RabbitTemplate template = new RabbitTemplate(this.rabbitConnectionFactory);

template.setQueue(queueName);


return template;

Configuring a batch job’s Step, that is, what happens for each piece of data that’s consumed, assuming we’re using chunk-based processing and not a tasklet.

import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import org.springframework.batch.item.ItemProcessor;

import org.springframework.batch.item.ItemReader;

import org.springframework.batch.item.ItemWriter;

import org.springframework.batch.item.amqp.AmqpItemReader;


...

@Autowired

private StepBuilderFactory stepBuilderFactory;


...


@Bean

public Step getMyJobStep() {

return this.stepBuilderFactory.get("myJobStep")

.<String, DomainObject>chunk(1)

.reader(this.getMyReader())

.processor(this.getMyProcessor())

.writer(this.getMyWriter())

.build();

}



@Bean

public ItemReader<String> getMyReader() {

return new AmqpItemReader<String>(this.getMyQueueTemplate());

}



@Bean

public ItemProcessor<String, DomainObject> getMyProcessor() {

return new MessageProcessor();

}



@Bean

public ItemWriter<DomainObjecvt> getMyWriter() {

return new MessageWriter();

}

The AmqpItemReader, which tells us that it is expecting a String to represent each message on the queue, takes the queue template we created earlier as an argument.  The AmqpItemReader is a class that comes with Spring Batch out of the box — no other work is needed to get it to work!  That’s the magic of Spring Batch!

Conclusion

We took an introductory look at how your Spring Batch project accepts input from an AMQP-based message queue with minimal setup.

And yes, this is it!! We are ready to roll.

If you enjoyed this and thought it was informative ,do share with your colleagues . Also let me know in the comment section on what should be the topic for my upcoming blog.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Sandesha Mhaiskar

Sandesha is a JAVA Backend developer. She is a Certified Scrum Master with 8+ years of Experience as a Java software engineer with focus on Spring-Boot, Hibernate, API's and DB Technologies . In her spare time she is an avid long form reader, lover of true-crime docuseries and she is a wanderlust.

More from this Author

Follow Us