项目记录:RabbitMq+Redis配置消息队列
目录
一、思路简述
这里主要配置的是增加操作,生产者部分负责将增加的数据存入redis,消费者部分负责将redis数据存入mysql。
二、配置依赖
配置RabbitMq的Maven:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置Redis的Maven:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置RabbitMq的yml:
rabbitmq: host: # 这里换成你的服务器地址 port: 5672 # 确保这里是RabbitMQ 实际用于客户端连接的端口,而不是管理界面端口 username: admin password: admin
配置Redis的yml:
data: redis: database: 0 host: port: 6379 password:
三、RabbitMq配置类
生产者:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//绑定
//将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
消费者:
mport Utils.DeferredResultHolder;
import com.example.lab01.Controller.RoomScheduleController;
import com.example.lab01.Mapper.RoomScheduleMapper;
import com.example.lab01.entity.RoomSchedule;
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
//监听的队列名称 TestDirectQueue
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@Resource
private RoomScheduleMapper roomScheduleMapper;
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
}
}
四、增加操作
生产者将前端获取的数据存入Redis:
@Override
public void addRoomSchedule(RoomSchedule roomSchedule) {
HashMap<String, String> map = new HashMap<>();
map.put("week", String.valueOf(roomSchedule.getWeek()));
map.put("weekday", String.valueOf(roomSchedule.getWeekday()));
map.put("period", String.valueOf(roomSchedule.getPeriod()));
map.put("room_id", String.valueOf(roomSchedule.getRoomId()));
map.put("teacher_id", String.valueOf(roomSchedule.getTeacherId()));
map.put("course_name",roomSchedule.getCourseName());
map.put("class_name",roomSchedule.getClassName());
map.put("student_count", String.valueOf(roomSchedule.getStudentCount()));
stringRedisTemplate.opsForHash().putAll("addData",map);
System.out.println("完成存入");
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map1=new HashMap<>();
map1.put("messageId",messageId);
map1.put("messageData",messageData);
map1.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map1);
}
消费者将Redis中的数据存入Mysql:
import Utils.DeferredResultHolder;
import com.example.lab01.Controller.RoomScheduleController;
import com.example.lab01.Mapper.RoomScheduleMapper;
import com.example.lab01.entity.RoomSchedule;
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
//监听的队列名称 TestDirectQueue
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@Resource
private RoomScheduleMapper roomScheduleMapper;
@Resource
private RoomScheduleMapper userMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
RoomSchedule roomSchedule1 = new RoomSchedule();
System.out.println("完成1");
System.out.println(stringRedisTemplate.opsForHash().get("addData","week"));
System.out.println(stringRedisTemplate.opsForHash().get("addData", "week").getClass().toString());
System.out.println(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","week").toString()));
roomSchedule1.setWeek(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","week").toString()));
System.out.println("完成1.1");
roomSchedule1.setWeekday(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","weekday").toString()));
roomSchedule1.setPeriod(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","period").toString()));
roomSchedule1.setRoomId(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","room_id").toString()));
roomSchedule1.setTeacherId(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","teacher_id").toString()));
roomSchedule1.setCourseName((String) stringRedisTemplate.opsForHash().get("addData","course_name"));
roomSchedule1.setClassName((String) stringRedisTemplate.opsForHash().get("addData","class_name"));
roomSchedule1.setStudentCount(Integer.parseInt(stringRedisTemplate.opsForHash().get("addData","student_count").toString()));
System.out.println("完成2");
System.out.println("redis数据"+roomSchedule1);
roomScheduleMapper.addRoomSchedule(roomSchedule1);
}
}