Spring boot + rabbitmq + gradle integration

RabbitMQ with Springboot flow
RabbitMQ with Springboot flow

RabbitMQ is quite famous , lightweight message broker. Spring boot provides easy integration with rabbitmq. you just to add dependencies and few annotations.

Setup RabbitMQ

1docker run -d -p 5672:5672 -p 8081:15671 -p 8082:15672 -p 5671:5671 --hostname my-rabbit --name my-rabbit rabbitmq:3-management

Sender

Sender application is responsible for sending message to RabbitMq.

build.gradle

 1plugins {
 2	id 'org.springframework.boot' version '2.6.3'
 3	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
 4	id 'java'
 5}
 6
 7group = 'com.poc'
 8version = '0.0.1-SNAPSHOT'
 9sourceCompatibility = '11'
10
11repositories {
12	mavenCentral()
13}
14
15dependencies {
16	implementation 'org.springframework.boot:spring-boot-starter-amqp'
17	implementation 'org.springframework.boot:spring-boot-starter-web'
18	testImplementation 'org.springframework.boot:spring-boot-starter-test'
19	testImplementation 'org.springframework.amqp:spring-rabbit-test'
20}
21
22tasks.named('test') {
23	useJUnitPlatform()
24}

application.properties

Here you can set certain properties related to connection. In case you need to make a ssl connection, mainly for production, don't forget to set spring.rabbitmq.ssl.enabled=true. Here , as we are only connecting with local, so ignoring ssl props.

 1server.port=8085
 2spring.rabbitmq.host=localhost
 3spring.rabbitmq.port=5672
 4spring.rabbitmq.username=guest
 5spring.rabbitmq.password=guest
 6#logging.level.root=DEBUG
 7spring.main.allow-bean-definition-overriding=true
 8sample.rabbitmq.queue=message-queue
 9sample.rabbitmq.exchange=message-queue-exchange
10sample.rabbitmq.routingkey=message-queue-routingkey

RabbitMQConfig.java

Here we define Exchnage, Queue and routingKey to connect Exchnage and Queue. Another configuration is MessageConverter , for this we use jackson implementation Jackson2JsonMessageConverter to exchange json data.

 1package com.poc.sender.config;
 2
 3import org.springframework.amqp.core.*;
 4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 5import org.springframework.amqp.rabbit.core.RabbitTemplate;
 6import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 7import org.springframework.amqp.support.converter.MessageConverter;
 8import org.springframework.beans.factory.annotation.Value;
 9import org.springframework.context.annotation.Bean;
10import org.springframework.context.annotation.Configuration;
11
12@Configuration
13public class RabbitMQConfig {
14    @Value("${sample.rabbitmq.queue}")
15    String queueName;
16    @Value("${sample.rabbitmq.exchange}")
17    String exchange;
18    @Value("${sample.rabbitmq.routingkey}")
19    private String routingkey;
20
21    @Bean
22    Queue queue() {
23        return new Queue(queueName, false);
24    }
25
26    @Bean
27    DirectExchange exchange() {
28        return new DirectExchange(exchange);
29    }
30
31    @Bean
32    Binding binding(Queue queue, DirectExchange exchange) {
33        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
34    }
35
36    @Bean
37    public MessageConverter jsonMessageConverter() {
38        return new Jackson2JsonMessageConverter();
39    }
40
41    @Bean
42    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
43        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
44        rabbitTemplate.setMessageConverter(jsonMessageConverter());
45        return rabbitTemplate;
46    }
47}

RabbitMQSender.java

An abstract service to send messages to RabbitMQ.

 1package com.poc.sender.service;
 2
 3import com.poc.sender.model.Message;
 4import org.springframework.amqp.core.AmqpTemplate;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.beans.factory.annotation.Value;
 7import org.springframework.scheduling.annotation.Scheduled;
 8import org.springframework.stereotype.Service;
 9
10import java.util.concurrent.atomic.AtomicLong;
11
12@Service
13public class RabbitMQSender {
14    @Autowired
15    private AmqpTemplate rabbitTemplate;
16
17    @Value("${sample.rabbitmq.exchange}")
18    private String exchange;
19
20    @Value("${sample.rabbitmq.routingkey}")
21    private String routingkey;
22    private AtomicLong count = new AtomicLong(0L);
23
24    @Scheduled
25    public void send(Message message) {
26
27        rabbitTemplate.convertAndSend(exchange, routingkey, message);
28        System.out.println("( " + count.incrementAndGet() + " ) Send =: " + message);
29    }
30}

Message.java

Message Model to exchange data.

 1package com.poc.sender.model;
 2
 3public class Message {
 4    private String message;
 5
 6    public Message() {
 7    }
 8
 9    public Message(String message) {
10        this.message = message;
11    }
12
13    public String getMessage() {
14        return message;
15    }
16
17    @Override
18    public String toString() {
19        return "Message{" +
20                "message='" + message + '\'' +
21                '}';
22    }
23}
24

Controller.java

Expose endpoint to send messages via API.

 1package com.poc.sender.controller;
 2
 3import com.poc.sender.model.Message;
 4import com.poc.sender.service.RabbitMQSender;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.web.bind.annotation.*;
 7
 8import java.io.IOException;
 9
10@RestController
11@RequestMapping(value = "/rabbitmq/")
12public class Controller {
13    @Autowired
14    private RabbitMQSender rabbitMQSender;
15
16    @PostMapping(value = "/producer")
17    public String producer(@RequestBody Message message) throws IOException {
18        rabbitMQSender.send(message);
19        return "Message sent to the RabbitMQ Successfully";
20    }
21}

RabbitmqSenderApplication.java

Main Springboot class to start application. Also, We have created 3 threads to check performance to send data.

 1package com.poc.sender;
 2
 3import com.poc.sender.model.Message;
 4import com.poc.sender.service.RabbitMQSender;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.boot.CommandLineRunner;
 7import org.springframework.boot.SpringApplication;
 8import org.springframework.boot.autoconfigure.SpringBootApplication;
 9
10import java.util.Random;
11
12@SpringBootApplication
13public class RabbitmqSenderApplication implements CommandLineRunner {
14    @Autowired
15    private RabbitMQSender rabbitMQSender;
16
17    public static void main(String[] args) {
18        SpringApplication.run(RabbitmqSenderApplication.class, args);
19    }
20
21
22    @Override
23    public void run(String... args) {
24        process("Sender-1");
25
26        process("Sender-2");
27
28        process("Sender-3");
29
30    }
31
32    private void process(String s) {
33        new Thread(() -> {
34            Random random = new Random();
35            for (long i = 0; ; i++) {
36                String message = "You have a new message with no " + i;
37                rabbitMQSender.send(new Message(message));
38
39                try {
40                    Thread.sleep(random.nextInt((15000 - 4000) + 1) + 4000);
41                } catch (InterruptedException e) {
42                    e.printStackTrace();
43                }
44            }
45
46        }, s).start();
47    }
48}

Receivers

Springboot RabbitMQ application to read data from RabbitMQ.

build.gradle

 1plugins {
 2	id 'org.springframework.boot' version '2.6.3'
 3	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
 4	id 'java'
 5}
 6
 7group = 'com.poc'
 8version = '0.0.1-SNAPSHOT'
 9sourceCompatibility = '11'
10
11repositories {
12	mavenCentral()
13}
14
15dependencies {
16	implementation 'org.springframework.boot:spring-boot-starter-amqp'
17	implementation 'org.springframework.boot:spring-boot-starter-web'
18	testImplementation 'org.springframework.boot:spring-boot-starter-test'
19	testImplementation 'org.springframework.amqp:spring-rabbit-test'
20}
21
22tasks.named('test') {
23	useJUnitPlatform()
24}
25

application.properties

Here you can set certain properties related to connection. In case you need to make a ssl connection, mainly for production, don't forget to set spring.rabbitmq.ssl.enabled=true. Here , as we are only connecting with local, so ignoring ssl props.

1server.port=8086
2spring.rabbitmq.host=localhost
3spring.rabbitmq.port=5672
4spring.rabbitmq.username=guest
5spring.rabbitmq.password=guest
6#logging.level.root=DEBUG
7spring.main.allow-bean-definition-overriding=true
8sample.rabbitmq.queue=message-queue

RabbitMQConfig.java

RabbitMQ configuration for Message converter.

 1package com.poc.receiver.config;
 2
 3import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 4import org.springframework.amqp.support.converter.MessageConverter;
 5import org.springframework.context.annotation.Bean;
 6import org.springframework.context.annotation.Configuration;
 7
 8@Configuration
 9public class RabbitMQConfig {
10    @Bean
11    public MessageConverter jsonMessageConverter() {
12        return new Jackson2JsonMessageConverter();
13    }
14}

RabbitMQ.java Listener

 1package com.poc.receiver.listener;
 2
 3import com.poc.receiver.model.Message;
 4import org.springframework.amqp.rabbit.annotation.RabbitListener;
 5import org.springframework.stereotype.Component;
 6
 7import java.util.concurrent.atomic.AtomicLong;
 8
 9@Component
10public class RabbitMQ {
11    private AtomicLong count = new AtomicLong(0L);
12
13    @RabbitListener(queues = "${sample.rabbitmq.queue}")
14    public void recievedMessage(Message message) {
15        System.out.println("( "+count.incrementAndGet()+" ) Received = : " + message);
16
17    }
18}

Message.java

Message Model to exchange data.

 1package com.poc.receiver.model;
 2
 3public class Message {
 4    private String message;
 5
 6    public Message() {
 7    }
 8
 9    public Message(String message) {
10        this.message = message;
11    }
12
13    public String getMessage() {
14        return message;
15    }
16
17    @Override
18    public String toString() {
19        return "Message{" +
20                "message='" + message + '\'' +
21                '}';
22    }
23}

RabbitmqReceiverApplication.java

 1package com.poc.receiver;
 2
 3import org.springframework.boot.SpringApplication;
 4import org.springframework.boot.autoconfigure.SpringBootApplication;
 5
 6@SpringBootApplication
 7public class RabbitmqReceiverApplication {
 8
 9	public static void main(String[] args) {
10		SpringApplication.run(RabbitmqReceiverApplication.class, args);
11	}
12
13}
14

How to test

When you run RabbitMQ, Sender and Receiver applications, you can see the out below. It's almost instantaneously receiver reads the data.

Sender's Logs

 1( 1 ) Send =: Message{message='You have a new message with no 0'}
 2( 3 ) Send =: Message{message='You have a new message with no 0'}
 3( 2 ) Send =: Message{message='You have a new message with no 0'}
 4( 4 ) Send =: Message{message='You have a new message with no 1'}
 5( 5 ) Send =: Message{message='You have a new message with no 1'}
 6( 6 ) Send =: Message{message='You have a new message with no 1'}
 7( 7 ) Send =: Message{message='You have a new message with no 2'}
 8( 8 ) Send =: Message{message='You have a new message with no 2'}
 9( 9 ) Send =: Message{message='You have a new message with no 2'}
10( 10 ) Send =: Message{message='You have a new message with no 3'}
11( 11 ) Send =: Message{message='You have a new message with no 3'}
12( 12 ) Send =: Message{message='You have a new message with no 3'}
13( 13 ) Send =: Message{message='You have a new message with no 4'}
14( 14 ) Send =: Message{message='You have a new message with no 4'}
15( 15 ) Send =: Message{message='You have a new message with no 4'}

Receiver's Logs

 1( 1 ) Received = : Message{message='You have a new message with no 0'}
 2( 2 ) Received = : Message{message='You have a new message with no 0'}
 3( 3 ) Received = : Message{message='You have a new message with no 0'}
 4( 4 ) Received = : Message{message='You have a new message with no 1'}
 5( 5 ) Received = : Message{message='You have a new message with no 1'}
 6( 6 ) Received = : Message{message='You have a new message with no 1'}
 7( 7 ) Received = : Message{message='You have a new message with no 2'}
 8( 8 ) Received = : Message{message='You have a new message with no 2'}
 9( 9 ) Received = : Message{message='You have a new message with no 2'}
10( 10 ) Received = : Message{message='You have a new message with no 3'}
11( 11 ) Received = : Message{message='You have a new message with no 3'}
12( 12 ) Received = : Message{message='You have a new message with no 3'}
13( 13 ) Received = : Message{message='You have a new message with no 4'}
14( 14 ) Received = : Message{message='You have a new message with no 4'}
15( 15 ) Received = : Message{message='You have a new message with no 4'}
16( 16 ) Received = : Message{message='You have a new message with no 5'}
17( 17 ) Received = : Message{message='You have a new message with no 5'}
18( 18 ) Received = : Message{message='You have a new message with no 5'}
19( 19 ) Received = : Message{message='You have a new message with no 6'}
20( 20 ) Received = : Message{message='You have a new message with no 6'}
21( 21 ) Received = : Message{message='You have a new message with no 7'}
22( 22 ) Received = : Message{message='You have a new message with no 7'}
23( 23 ) Received = : Message{message='You have a new message with no 6'}

Check complete code on github

You can check the complete code at GitHub.

comments powered by Disqus