Spring boot + rabbitmq with retry + gradle integration

RabbitMQ with Springboot flow
RabbitMQ with Springboot flow

While integrating RabbitMQ with gradle, we oftne feel need to retry messages on receiver side. For example, Receiver got a message and there is some issue while processing it. What will you do ? It's often a good idea to requeue the message and then try after some time. Ofcourse , this retry can not go on indefinitely, that's why we need a limit too while retrying. With Springboot, it's just a matter for setting few properties.

Setup RabbitMQ

1docker run -d -p 5672:5672 -p 8081:15671 -p 8082:15672 -p 5671:5671 --hostname my-rabbit --name my-rabbit rabbitmq:3-management

Sender

Sender application is responsible for sending message to RabbitMq.

build.gradle

 1plugins {
 2	id 'org.springframework.boot' version '2.6.3'
 3	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
 4	id 'java'
 5}
 6
 7group = 'com.poc'
 8version = '0.0.1-SNAPSHOT'
 9sourceCompatibility = '11'
10
11repositories {
12	mavenCentral()
13}
14
15dependencies {
16    implementation 'org.springframework.boot:spring-boot-starter-actuator'
17	implementation 'org.springframework.boot:spring-boot-starter-amqp'
18	implementation 'org.springframework.boot:spring-boot-starter-web'
19	testImplementation 'org.springframework.boot:spring-boot-starter-test'
20	testImplementation 'org.springframework.amqp:spring-rabbit-test'
21}
22
23tasks.named('test') {
24	useJUnitPlatform()
25}

application.properties

Here you can set certain properties related to connection. In case you need to make a ssl connection, mainly for production, don't forget to set spring.rabbitmq.ssl.enabled=true. Here , as we are only connecting with local, so ignoring ssl props.

 1server.port=8085
 2spring.rabbitmq.host=localhost
 3spring.rabbitmq.port=5672
 4spring.rabbitmq.username=guest
 5spring.rabbitmq.password=guest
 6#logging.level.root=DEBUG
 7spring.main.allow-bean-definition-overriding=true
 8sample.rabbitmq.queue=message-queue
 9sample.rabbitmq.exchange=message-queue-exchange
10sample.rabbitmq.routingkey=message-queue-routingkey

RabbitMQConfig.java

Here we define Exchnage, Queue and routingKey to connect Exchnage and Queue. Another configuration is MessageConverter , for this we use jackson implementation Jackson2JsonMessageConverter to exchange json data.

 1package com.poc.sender.config;
 2
 3import org.springframework.amqp.core.*;
 4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 5import org.springframework.amqp.rabbit.core.RabbitTemplate;
 6import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 7import org.springframework.amqp.support.converter.MessageConverter;
 8import org.springframework.beans.factory.annotation.Value;
 9import org.springframework.context.annotation.Bean;
10import org.springframework.context.annotation.Configuration;
11
12@Configuration
13public class RabbitMQConfig {
14    @Value("${sample.rabbitmq.queue}")
15    String queueName;
16    @Value("${sample.rabbitmq.exchange}")
17    String exchange;
18    @Value("${sample.rabbitmq.routingkey}")
19    private String routingkey;
20
21    @Bean
22    Queue queue() {
23        return new Queue(queueName, false);
24    }
25
26    @Bean
27    DirectExchange exchange() {
28        return new DirectExchange(exchange);
29    }
30
31    @Bean
32    Binding binding(Queue queue, DirectExchange exchange) {
33        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
34    }
35
36    @Bean
37    public MessageConverter jsonMessageConverter() {
38        return new Jackson2JsonMessageConverter();
39    }
40
41    @Bean
42    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
43        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
44        rabbitTemplate.setMessageConverter(jsonMessageConverter());
45        return rabbitTemplate;
46    }
47}

RabbitMQSender.java

An abstract service to send messages to RabbitMQ.

 1package com.poc.sender.service;
 2
 3import com.poc.sender.model.Message;
 4import org.springframework.amqp.core.AmqpTemplate;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.beans.factory.annotation.Value;
 7import org.springframework.scheduling.annotation.Scheduled;
 8import org.springframework.stereotype.Service;
 9
10import java.util.concurrent.atomic.AtomicLong;
11
12@Service
13public class RabbitMQSender {
14    @Autowired
15    private AmqpTemplate rabbitTemplate;
16
17    @Value("${sample.rabbitmq.exchange}")
18    private String exchange;
19
20    @Value("${sample.rabbitmq.routingkey}")
21    private String routingkey;
22    private AtomicLong count = new AtomicLong(0L);
23
24    @Scheduled
25    public void send(Message message) {
26
27        rabbitTemplate.convertAndSend(exchange, routingkey, message);
28        System.out.println("( " + count.incrementAndGet() + " ) Send =: " + message);
29    }
30}

Message.java

Message Model to exchange data.

 1package com.poc.sender.model;
 2
 3public class Message {
 4    private String message;
 5
 6    public Message() {
 7    }
 8
 9    public Message(String message) {
10        this.message = message;
11    }
12
13    public String getMessage() {
14        return message;
15    }
16
17    @Override
18    public String toString() {
19        return "Message{" +
20                "message='" + message + '\'' +
21                '}';
22    }
23}
24

Controller.java

Expose endpoint to send messages via API.

 1package com.poc.sender.controller;
 2
 3import com.poc.sender.model.Message;
 4import com.poc.sender.service.RabbitMQSender;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.web.bind.annotation.*;
 7
 8import java.io.IOException;
 9
10@RestController
11@RequestMapping(value = "/rabbitmq/")
12public class Controller {
13    @Autowired
14    private RabbitMQSender rabbitMQSender;
15
16    @PostMapping(value = "/producer")
17    public String producer(@RequestBody Message message) throws IOException {
18        rabbitMQSender.send(message);
19        return "Message sent to the RabbitMQ Successfully";
20    }
21}

RabbitmqSenderApplication.java

Main Springboot class to start application.

 1package com.poc.sender;
 2
 3import com.poc.sender.model.Message;
 4import com.poc.sender.service.RabbitMQSender;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.boot.CommandLineRunner;
 7import org.springframework.boot.SpringApplication;
 8import org.springframework.boot.autoconfigure.SpringBootApplication;
 9
10import java.util.Random;
11
12@SpringBootApplication
13public class RabbitmqSenderApplication {
14    public static void main(String[] args) {
15        SpringApplication.run(RabbitmqSenderApplication.class, args);
16    }
17
18}

Receivers

Springboot RabbitMQ application to read data from RabbitMQ.

build.gradle

 1plugins {
 2	id 'org.springframework.boot' version '2.6.3'
 3	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
 4	id 'java'
 5}
 6
 7group = 'com.poc'
 8version = '0.0.1-SNAPSHOT'
 9sourceCompatibility = '11'
10
11repositories {
12	mavenCentral()
13}
14
15dependencies {
16    implementation 'org.springframework.boot:spring-boot-starter-actuator'
17	implementation 'org.springframework.boot:spring-boot-starter-amqp'
18	implementation 'org.springframework.boot:spring-boot-starter-web'
19	testImplementation 'org.springframework.boot:spring-boot-starter-test'
20	testImplementation 'org.springframework.amqp:spring-rabbit-test'
21}
22
23tasks.named('test') {
24	useJUnitPlatform()
25}
26

application.properties

Here you can set certain properties related to connection. In case you need to make a ssl connection, mainly for production, don't forget to set spring.rabbitmq.ssl.enabled=true. Here , as we are only connecting with local, so ignoring ssl props.

 1server.port=8086
 2spring.rabbitmq.host=localhost
 3spring.rabbitmq.port=5672
 4spring.rabbitmq.username=guest
 5spring.rabbitmq.password=guest
 6#logging.level.root=DEBUG
 7spring.main.allow-bean-definition-overriding=true
 8sample.rabbitmq.queue=message-queue
 9sample.rabbitmq.exchange=message-queue-exchange
10sample.rabbitmq.routingkey=message-queue-routingkey
11spring.rabbitmq.listener.simple.missing-queues-fatal=false
12spring.rabbitmq.listener.simple.acknowledge-mode=auto
13spring.rabbitmq.listener.simple.retry.max-attempts=5
14spring.rabbitmq.listener.simple.retry.enabled=true
15spring.rabbitmq.listener.simple.retry.max-interval=10000
16spring.rabbitmq.listener.simple.retry.initial-interval=5000
17spring.rabbitmq.listener.simple.retry.multiplier=2
18spring.rabbitmq.listener.simple.concurrency=3
Info

spring.rabbitmq.listener.simple.missing-queues-fatal=false means not to fail the spring boot application in case queue do not exist. In current example, this case will never occurred as we have defined the queue in config.

spring.rabbitmq.listener.simple.acknowledge-mode=auto will help setting up the ack automatically. E.g if we read message and processed it without any error, that message will get removed from queue but if we throw some exception, like here we are throwing MessageException if message is Hello , then message will requeue and retry mechanism will get executed.

spring.rabbitmq.listener.simple.retry.enabled=true Retry is enabled.

spring.rabbitmq.listener.simple.retry.max-attempts=5 each failed message will be retried 5 times.

spring.rabbitmq.listener.simple.retry.initial-interval=5000 if message read fails, this message will be retried again after this time.

spring.rabbitmq.listener.simple.retry.multiplier=2 if a failed messaged failed again , then the retry time would be last retry multiply by this multiplier and max can be spring.rabbitmq.listener.simple.retry.max-interval=10000

E.g. if a message failed all five times here, then retry timing would be below (including first read)

  • 0 Seconds
  • 5 Seconds
  • 10 Seconds
  • 10 Seconds (because spring.rabbitmq.listener.simple.retry.max-interval=10000 is set, otherwise it would have been 20 Seconds)
  • 10 Seconds (because spring.rabbitmq.listener.simple.retry.max-interval=10000 is set, otherwise it would have been 40 Seconds)

RabbitMQConfig.java

RabbitMQ configuration for Message converter.

 1package com.poc.receiver.config;
 2
 3import org.springframework.amqp.core.Binding;
 4import org.springframework.amqp.core.BindingBuilder;
 5import org.springframework.amqp.core.DirectExchange;
 6import org.springframework.amqp.core.Queue;
 7import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 8import org.springframework.amqp.support.converter.MessageConverter;
 9import org.springframework.beans.factory.annotation.Value;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13@Configuration
14public class RabbitMQConfig {
15  @Value("${sample.rabbitmq.queue}")
16  String queueName;
17  @Value("${sample.rabbitmq.exchange}")
18  String exchange;
19  @Value("${sample.rabbitmq.routingkey}")
20  private String routingkey;
21
22  @Bean
23  Queue queue() {
24    return new Queue(queueName, false);
25  }
26
27  @Bean
28  DirectExchange exchange() {
29    return new DirectExchange(exchange);
30  }
31
32  @Bean
33  Binding binding(Queue queue, DirectExchange exchange) {
34    return BindingBuilder.bind(queue).to(exchange).with(routingkey);
35  }
36
37  @Bean
38  public MessageConverter jsonMessageConverter() {
39    return new Jackson2JsonMessageConverter();
40  }
41}

RabbitMQ.java Listener

 1package com.poc.receiver.listener;
 2
 3import com.poc.receiver.MessageException;
 4import com.poc.receiver.model.Message;
 5import org.springframework.amqp.rabbit.annotation.RabbitListener;
 6import org.springframework.stereotype.Component;
 7
 8import java.time.Instant;
 9import java.util.concurrent.atomic.AtomicLong;
10
11@Component
12public class RabbitMQ {
13  private AtomicLong count = new AtomicLong(0L);
14  @RabbitListener(queues = "${sample.rabbitmq.queue}")
15  public void receivedMessage(Message message) {
16    System.out.println(Instant.now() + " ( "+count.incrementAndGet()+" ) Received = : " + message);
17    if(message.getMessage().equalsIgnoreCase("Hello"))
18      throw new MessageException("Throwing Message exception");
19
20  }
21}

Message.java

Message Model to exchange data.

 1package com.poc.receiver.model;
 2
 3public class Message {
 4    private String message;
 5
 6    public Message() {
 7    }
 8
 9    public Message(String message) {
10        this.message = message;
11    }
12
13    public String getMessage() {
14        return message;
15    }
16
17    @Override
18    public String toString() {
19        return "Message{" +
20                "message='" + message + '\'' +
21                '}';
22    }
23}

RabbitmqReceiverApplication.java

 1package com.poc.receiver;
 2
 3import org.springframework.boot.SpringApplication;
 4import org.springframework.boot.autoconfigure.SpringBootApplication;
 5
 6@SpringBootApplication
 7public class RabbitmqReceiverApplication {
 8
 9	public static void main(String[] args) {
10		SpringApplication.run(RabbitmqReceiverApplication.class, args);
11	}
12
13}
14
Tip

You might have notice this implementation 'org.springframework.boot:spring-boot-starter-actuator' dependency in build.gradle. Ofcourse, this provides production ready endpoints for monitoring the application. One more important thing this do here is that it automatically add RabbitMQ to health status of application. And while doing so, this execute all beans define in config file, which result in creating queue, exchnage and bindings automatically on application startup. If we don't include this dependency, then queue, exchnage and bindings will get created if we manually call the bean in e.g. CommandlineListener or on first use e.g when we call API to send message.

Check complete code on github

You can check the complete code at GitHub.

comments powered by Disqus