SpringBoot集成RabbitMQ基本使用
1. 简单模式-无交换机(一个生产者、一个队列、一个消费者)
-
加入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
在 application.yml 中配置rabbitmq的 连接信息
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
-
列配置,启动时创建队列
@Configuration public class RabbitMqConfig { @Bean public Queue createQueue(){ return new Queue("hello-queue"); } }
-
创建生产者(可以卸载接口里面)
@Component public class Sender { @Autowired private AmqpTemplate amqpTemplate; public void send(String msg){ this.amqpTemplate.convertAndSend("hello-queue",msg); } }
-
创建消费者
@Component public class Receiver { @RabbitListener(queues = "hello-queue") public void process(String msg){ System.out.println("receiver: "+msg); } }
结果:控制台打印出消费者的log日志:receiver: rabbit-mq-test
2.发布订阅模式
-
消息发送方:
@RestController public class RabbitMqController { @Autowired private RabbitTemplate rabbit; @RequestMapping("/rabbitmq/sendPublisher") public String sendPublisher(){ for(int i=0;i<20;i++){ rabbit.convertSendAndReceive("exchange_fanout","","测试发布订阅模式:"+i); } return "发送成功...."; } }
-
定义交换机
@Bean public FanoutExchange exchangeFanout(){ return new FanoutExchange("exchange_fanout"); }
-
定义两个消费者,每个消费者都需要绑定这个交换机,定义的两个消费者也需要定义成Bean。并且和exchange_fanout进行绑定
@Bean public Queue queueFanout1(){ return new Queue("queue_fanout1"); } @Bean public Queue queueFanout2(){ return new Queue("queue_fanout2"); } @Bean public Binding bindingExchange1(Queue queueFanout1,FanoutExchange exchangeFanout){ return BindingBuilder.bind(queueFanout1).to(exchangeFanout); } @Bean public Binding bindingExchange2(Queue queueFanout2,FanoutExchange exchangeFanout){ return BindingBuilder.bind(queueFanout2).to(exchangeFanout); }
-
通过RabbitListener的Rabbitmq的监听器来获取消息。
@RabbitListener(queues="queue_fanout1") public void receiverMsg1(String msg){ System.out.println("队列1接收到的信息:"+msg); } @RabbitListener(queues="queue_fanout2") public void receiverMsg2(String msg){ System.out.println("队列2接收到的信息:"+msg); }
整体监听器的代码如下:
-
package com.example.demo.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.context.annotation.Bean; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.TopicExchange; @Slf4j @Component public class RabbitMqConsumer { @Bean public Queue queueFanout1(){ return new Queue("queue_fanout1"); } @Bean public Queue queueFanout2(){ return new Queue("queue_fanout2"); } @Bean public FanoutExchange exchangeFanout(){ return new FanoutExchange("exchange_fanout"); } @Bean public Binding bindingExchange1(Queue queueFanout1,FanoutExchange exchangeFanout){ return BindingBuilder.bind(queueFanout1).to(exchangeFanout); } @Bean public Binding bindingExchange2(Queue queueFanout2,FanoutExchange exchangeFanout){ return BindingBuilder.bind(queueFanout2).to(exchangeFanout); } @RabbitListener(queues="queue_fanout1") public void receiverMsg1(String msg){ System.out.println("队列1接收到的信息:"+msg); } @RabbitListener(queues="queue_fanout2") public void receiverMsg2(String msg){ System.out.println("队列2接收到的信息:"+msg); } }
启动服务后,当浏览器访问“/rabbitmq/sendPublisher”后,在浏览器中显示文字“发送成功”,在控制台输出队列1和队列2的相关信息。浏览器显示如下图:
可在控制台看到结果。
3. routing(direct)模式
-
首先是配置类,在配置类中我们需要声明交换机,队列和绑定关系。
-
@Configuration public class DirectExchangeConfig { public static final String DIRECT_QUEUE = "directQueue"; public static final String DIRECT_QUEUE2 = "directQueue2"; public static final String DIRECT_EXCHANGE = "directExchange"; public static final String DIRECT_ROUTING_KEY = "direct"; @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE, true); } @Bean public Queue directQueue2() { return new Queue(DIRECT_QUEUE2, true); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE, true, false); } @Bean public Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY); } @Bean public Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY); } }
-
生产者
@RestController @Slf4j @RequestMapping("/direct") public class DirectController { private final RabbitTemplate rabbitTemplate; public DirectController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * direct交换机为直连模式交换机 * 根据消息携带的路由键将消息投递给对应队列 * * * @return */ @GetMapping("send") public Object sendMsg() { rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct"); return "direct消息发送成功!!"; } }
-
消费者
package com.lsqingfeng.action.rabbitmq.direct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @className: DirectQueueListener * @description: 直连交换机的监听器 * @author: sh.Liu * @date: 2021-08-23 16:03 */ @Slf4j @Component public class DirectQueueListener { /** * 尽管设置了两个消费者,但是只有一个能够消费成功 * 多次发送则轮训消费: * DirectReceiver消费者收到消息1 : 发送一条测试消息:direct * DirectReceiver消费者收到消息2 : 发送一条测试消息:direct * DirectReceiver消费者收到消息1 : 发送一条测试消息:direct * DirectReceiver消费者收到消息2 : 发送一条测试消息:direct * * 一个交换机可以绑定多个队列。如果通过路由key可以匹配到多个队列,消费的时候也只能有一个进行消费 * @param testMessage */ @RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE) public void process(String testMessage) { System.out.println("DirectReceiver消费者收到消息1 : " + testMessage); } @RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE) public void process2(String testMessage) { System.out.println("DirectReceiver消费者收到消息2 : " + testMessage); } @RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2) public void process3(String testMessage) { System.out.println("DirectReceiver消费者收到消息3 : " + testMessage); } }
4. topic模式
如上图,主题模式不能具有任意的 routingKey,必须由一个英文句点“.”分隔的字符串(分割符)
其中***表示任意多个单词,#**表示一个单词
package com.rabbitmq.util;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class rabbitMqUtil {
@Bean(name = "okong")
public Queue queue() {
//创建一个消息队列
return new Queue("topic.okong");
}
@Bean(name = "qune")
public Queue qune() {
//创建一个消息队列
return new Queue("topic.qune");
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topic");//配置路由器为Topic模式
}
@Bean
Binding bindingExchangeA(@Qualifier("okong") Queue queue, TopicExchange topicExchange) {
// 配置该消息队列的 routingKey
//topic.* 匹配 第一个.后面的单词 代表 一个 单词
//比如 topic.asd 会被该消息队列接受 topic.asd.dsf不会被该消息队列接受
return BindingBuilder.bind(queue).to(topicExchange).with("topic.*");
}
@Bean
Binding bindingExchangeB(@Qualifier("qune") Queue qune, TopicExchange topicExchange) {
// 配置该消息队列的 routingKey
//topic.# 匹配 所有.后面的单词 代表 任意 个 单词
//比如 topic.asd 会被该消息队列接受 topic.asd.dsf也会被该消息队列接受
return BindingBuilder.bind(qune).to(topicExchange).with("topic.#");
}
}
-
监听类:
package com.rabbitmq.util; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ListenterQune { @RabbitListener(queues = "topic.qune") @RabbitHandler public void process(String mess){ System.out.println("我是topic.qune的消费者接收到的消息为 :"+mess); } @RabbitListener(queues = "topic.okong") @RabbitHandler public void okong(String mess){ System.out.println("我是topic.okong的消费者接收到的消息为:"+mess); } }
-
发送方:
@RequestMapping("sendTwo") public void sendTwo(String mes){ amqpTemplate.convertAndSend("topic","topic.name",mes); amqpTemplate.convertAndSend("topic","topic.a",mes); }
这个样发送两个队列都可以收到: