diff --git a/README.md b/README.md index cbf2da39..49162574 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ | 序列化框架 | Jackson | [Jackson官网](https://github.com/FasterXML/jackson) | 统一使用 jackson 高效可靠 | | Redis客户端 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 支持单机、集群配置 | | 分布式限流 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 全局、请求IP、集群ID 多种限流 | +| 分布式队列 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 普通队列、延迟队列、优先队列 等 | | 分布式锁 | Lock4j | [Lock4j官网](https://gitee.com/baomidou/lock4j) | 注解锁、工具锁 多种多样 | | 分布式幂等 | Redisson | [Lock4j文档](https://gitee.com/baomidou/lock4j) | 拦截重复提交 | | 分布式日志 | TLog | [TLog文档](https://yomahub.com/tlog/docs) | 支持跟踪链路日志记录、性能分析、链路排查 | diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java b/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java new file mode 100644 index 00000000..c70acbad --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java @@ -0,0 +1,215 @@ +package com.ruoyi.common.utils.redis; + +import com.ruoyi.common.utils.spring.SpringUtils; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.redisson.api.*; + +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * 分布式队列工具 + * 轻量级队列 重量级数据量 请使用 MQ + * 要求 redis 5.X 以上 + * + * @author Lion Li + * @version 3.6.0 新增 + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class QueueUtils { + + private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class); + + + /** + * 获取客户端实例 + */ + public static RedissonClient getClient() { + return CLIENT; + } + + /** + * 添加延迟队列数据 默认毫秒 + * + * @param queueName 队列名 + * @param data 数据 + * @param time 延迟时间 + */ + public static void addDelayedQueueObject(String queueName, T data, long time) { + addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS); + } + + /** + * 添加延迟队列数据 + * + * @param queueName 队列名 + * @param data 数据 + * @param time 延迟时间 + * @param timeUnit 单位 + */ + public static void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) { + RBlockingQueue queue = CLIENT.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = CLIENT.getDelayedQueue(queue); + // 已存在则无视 + if (delayedQueue.contains(data)) { + return; + } + delayedQueue.offer(data, time, timeUnit); + } + + /** + * 删除延迟队列数据 + */ + public static boolean removeDelayedQueueObject(String queueName, T data) { + RBlockingQueue queue = CLIENT.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = CLIENT.getDelayedQueue(queue); + return delayedQueue.remove(data); + } + + /** + * 销毁延迟队列 所有阻塞监听 报错 + */ + public static void destroyDelayedQueue(String queueName) { + RBlockingQueue queue = CLIENT.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = CLIENT.getDelayedQueue(queue); + delayedQueue.destroy(); + } + + /** + * 尝试设置 优先队列比较器 用于排序优先级 + * + * @param queueName 队列名 + * @param comparator 比较器 + */ + public static boolean trySetPriorityQueueComparator(String queueName, Comparator comparator) { + RPriorityBlockingQueue priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); + return priorityBlockingQueue.trySetComparator(comparator); + } + + /** + * 尝试设置 优先队列比较器 用于排序优先级 + * + * @param queueName 队列名 + * @param comparator 比较器 + * @param destroy 已存在是否销毁 + */ + public static boolean trySetPriorityQueueComparator(String queueName, Comparator comparator, boolean destroy) { + RPriorityBlockingQueue priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); + if (priorityBlockingQueue.isExists() && destroy) { + destroyPriorityQueueObject(queueName); + } + return priorityBlockingQueue.trySetComparator(comparator); + } + + /** + * 添加优先队列数据 + * + * @param queueName 队列名 + * @param data 数据 + */ + public static boolean addPriorityQueueObject(String queueName, T data) { + RPriorityBlockingQueue priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); + return priorityBlockingQueue.offer(data); + } + + /** + * 获取一个优先队列数据 没有数据返回 null + * + * @param queueName 队列名 + */ + public static T getPriorityQueueObject(String queueName) { + RPriorityBlockingQueue priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); + return priorityBlockingQueue.poll(); + } + + /** + * 删除优先队列数据 + */ + public static boolean removePriorityQueueObject(String queueName, T data) { + RPriorityBlockingQueue priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); + return priorityBlockingQueue.remove(data); + } + + /** + * 销毁优先队列 + */ + public static boolean destroyPriorityQueueObject(String queueName) { + RPriorityBlockingQueue priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); + return priorityBlockingQueue.delete(); + } + + /** + * 尝试设置 有界队列 容量 用于限制数量 + * + * @param queueName 队列名 + * @param capacity 容量 + */ + public static boolean trySetBoundedQueueCapacity(String queueName, int capacity) { + RBoundedBlockingQueue boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); + return boundedBlockingQueue.trySetCapacity(capacity); + } + + /** + * 尝试设置 有界队列 容量 用于限制数量 + * + * @param queueName 队列名 + * @param capacity 容量 + * @param destroy 已存在是否销毁 + */ + public static boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) { + RBoundedBlockingQueue boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); + if (boundedBlockingQueue.isExists() && destroy) { + destroyBoundedQueueObject(queueName); + } + return boundedBlockingQueue.trySetCapacity(capacity); + } + + /** + * 添加有界队列数据 + * + * @param queueName 队列名 + * @param data 数据 + * @return 添加成功 true 已达到界限 false + */ + public static boolean addBoundedQueueObject(String queueName, T data) { + RBoundedBlockingQueue boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); + return boundedBlockingQueue.offer(data); + } + + /** + * 获取一个有界队列数据 没有数据返回 null + * + * @param queueName 队列名 + */ + public static T getBoundedQueueObject(String queueName) { + RBoundedBlockingQueue boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); + return boundedBlockingQueue.poll(); + } + + /** + * 删除有界队列数据 + */ + public static boolean removeBoundedQueueObject(String queueName, T data) { + RBoundedBlockingQueue boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); + return boundedBlockingQueue.remove(data); + } + + /** + * 销毁有界队列 + */ + public static boolean destroyBoundedQueueObject(String queueName) { + RBoundedBlockingQueue boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); + return boundedBlockingQueue.delete(); + } + + /** + * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等) + */ + public static void subscribeBlockingQueue(String queueName, Consumer consumer) { + RBlockingQueue queue = CLIENT.getBlockingQueue(queueName); + queue.subscribeOnElements(consumer); + } + +} diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java new file mode 100644 index 00000000..9be47a1d --- /dev/null +++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java @@ -0,0 +1,83 @@ +package com.ruoyi.demo.controller.queue; + +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.common.utils.redis.QueueUtils; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 有界队列 演示案例 + *

+ * 轻量级队列 重量级数据量 请使用 MQ + *

+ * 集群测试通过 同一个数据只会被消费一次 做好事务补偿 + * 集群测试流程 在其中一台发送数据 两端分别调用获取接口 一次获取一条 + * + * @author Lion Li + * @version 3.6.0 + */ +@Slf4j +@Api(value = "有界队列 演示案例", tags = {"有界队列"}) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +@RestController +@RequestMapping("/demo/queue/bounded") +public class BoundedQueueController { + + + @ApiOperation("添加队列数据") + @GetMapping("/add") + public AjaxResult add(@ApiParam("队列名") String queueName, + @ApiParam("容量") int capacity) { + // 用完了一定要销毁 否则会一直存在 + boolean b = QueueUtils.destroyBoundedQueueObject(queueName); + log.info("通道: {} , 删除: {}", queueName, b); + // 初始化设置一次即可 + if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) { + log.info("通道: {} , 设置容量: {}", queueName, capacity); + } else { + log.info("通道: {} , 设置容量失败", queueName); + return AjaxResult.error("操作失败"); + } + for (int i = 0; i < 11; i++) { + String data = "data-" + i; + boolean flag = QueueUtils.addBoundedQueueObject(queueName, data); + if (flag == false) { + log.info("通道: {} , 发送数据: {} 失败, 通道已满", queueName, data); + } else { + log.info("通道: {} , 发送数据: {}", queueName, data); + } + } + return AjaxResult.success("操作成功"); + } + + @ApiOperation("删除队列数据") + @GetMapping("/remove") + public AjaxResult remove(@ApiParam("队列名") String queueName) { + String data = "data-" + 5; + if (QueueUtils.removeBoundedQueueObject(queueName, data)) { + log.info("通道: {} , 删除数据: {}", queueName, data); + } else { + return AjaxResult.error("操作失败"); + } + return AjaxResult.success("操作成功"); + } + + @ApiOperation("获取队列数据") + @GetMapping("/get") + public AjaxResult get(@ApiParam("队列名") String queueName) { + String data; + do { + data = QueueUtils.getBoundedQueueObject(queueName); + log.info("通道: {} , 获取数据: {}", queueName, data); + } while (data != null); + return AjaxResult.success("操作成功"); + } + +} diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java new file mode 100644 index 00000000..f81765b0 --- /dev/null +++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java @@ -0,0 +1,79 @@ +package com.ruoyi.demo.controller.queue; + +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.common.utils.redis.QueueUtils; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.TimeUnit; + +/** + * 延迟队列 演示案例 + *

+ * 轻量级队列 重量级数据量 请使用 MQ + * 例如: 创建订单30分钟后过期处理 + *

+ * 集群测试通过 同一个数据只会被消费一次 做好事务补偿 + * 集群测试流程 两台集群分别开启订阅 在其中一台发送数据 观察接收消息的规律 + * + * @author Lion Li + * @version 3.6.0 + */ +@Slf4j +@Api(value = "延迟队列 演示案例", tags = {"延迟队列"}) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +@RestController +@RequestMapping("/demo/queue/delayed") +public class DelayedQueueController { + + @ApiOperation("订阅队列") + @GetMapping("/subscribe") + public AjaxResult subscribe(@ApiParam("队列名") String queueName) { + log.info("通道: {} 监听中......", queueName); + // 项目初始化设置一次即可 + QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> { + // 观察接收时间 + log.info("通道: {}, 收到数据: {}", queueName, orderNum); + }); + return AjaxResult.success("操作成功"); + } + + @ApiOperation("添加队列数据") + @GetMapping("/add") + public AjaxResult add(@ApiParam("队列名") String queueName, + @ApiParam("订单号") String orderNum, + @ApiParam("延迟时间(秒)") Long time) { + QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS); + // 观察发送时间 + log.info("通道: {} , 发送数据: {}", queueName, orderNum); + return AjaxResult.success("操作成功"); + } + + @ApiOperation("删除队列数据") + @GetMapping("/remove") + public AjaxResult remove(@ApiParam("队列名") String queueName, + @ApiParam("订单号") String orderNum) { + if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) { + log.info("通道: {} , 删除数据: {}", queueName, orderNum); + } else { + return AjaxResult.error("操作失败"); + } + return AjaxResult.success("操作成功"); + } + + @ApiOperation("销毁队列") + @GetMapping("/destroy") + public AjaxResult destroy(@ApiParam("队列名") String queueName) { + // 用完了一定要销毁 否则会一直存在 + QueueUtils.destroyDelayedQueue(queueName); + return AjaxResult.success("操作成功"); + } + +} diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java new file mode 100644 index 00000000..08576904 --- /dev/null +++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java @@ -0,0 +1,19 @@ +package com.ruoyi.demo.controller.queue; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +/** + * 实体类 注意不允许使用内部类 否则会找不到类 + * + * @author Lion Li + * @version 3.6.0 + */ +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class PriorityDemo { + private String name; + private Integer orderNum; +} diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java new file mode 100644 index 00000000..f72e6950 --- /dev/null +++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java @@ -0,0 +1,16 @@ +package com.ruoyi.demo.controller.queue; + +import java.util.Comparator; + +/** + * 比较器 注意不允许使用 内部类或匿名类或lambda表达式 会找不到类 + * + * @author Lion Li + * @version 3.6.0 + */ +public class PriorityDemoComparator implements Comparator { + @Override + public int compare(PriorityDemo pd1, PriorityDemo pd2) { + return Integer.compare(pd1.getOrderNum(), pd2.getOrderNum()); + } +} diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java new file mode 100644 index 00000000..130c1f04 --- /dev/null +++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java @@ -0,0 +1,85 @@ +package com.ruoyi.demo.controller.queue; + +import cn.hutool.core.util.RandomUtil; +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.common.utils.redis.QueueUtils; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 优先队列 演示案例 + *

+ * 轻量级队列 重量级数据量 请使用 MQ + *

+ * 集群测试通过 同一个消息只会被消费一次 做好事务补偿 + * 集群测试流程 在其中一台发送数据 两端分别调用获取接口 一次获取一条 + * + * @author Lion Li + * @version 3.6.0 + */ +@Slf4j +@Api(value = "优先队列 演示案例", tags = {"优先队列"}) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +@RestController +@RequestMapping("/demo/queue/priority") +public class PriorityQueueController { + + @ApiOperation("添加队列数据") + @GetMapping("/add") + public AjaxResult add(@ApiParam("队列名") String queueName) { + // 用完了一定要销毁 否则会一直存在 + boolean b = QueueUtils.destroyPriorityQueueObject(queueName); + log.info("通道: {} , 删除: {}", queueName, b); + // 初始化设置一次即可 此处注意 不允许用内部类或匿名类 + boolean flag = QueueUtils.trySetPriorityQueueComparator(queueName, new PriorityDemoComparator()); + if (flag) { + log.info("通道: {} , 设置比较器成功", queueName); + } else { + log.info("通道: {} , 设置比较器失败", queueName); + return AjaxResult.error("操作失败"); + } + for (int i = 0; i < 10; i++) { + int randomNum = RandomUtil.randomInt(10); + PriorityDemo data = new PriorityDemo().setName("data-" + i).setOrderNum(randomNum); + if (QueueUtils.addPriorityQueueObject(queueName, data)) { + log.info("通道: {} , 发送数据: {}", queueName, data); + } else { + log.info("通道: {} , 发送数据: {}, 发送失败", queueName, data); + } + } + return AjaxResult.success("操作成功"); + } + + @ApiOperation("删除队列数据") + @GetMapping("/remove") + public AjaxResult remove(@ApiParam("队列名") String queueName, + @ApiParam("对象名") String name, + @ApiParam("排序号") Integer orderNum) { + PriorityDemo data = new PriorityDemo().setName(name).setOrderNum(orderNum); + if (QueueUtils.removePriorityQueueObject(queueName, data)) { + log.info("通道: {} , 删除数据: {}", queueName, data); + } else { + return AjaxResult.error("操作失败"); + } + return AjaxResult.success("操作成功"); + } + + @ApiOperation("获取队列数据") + @GetMapping("/get") + public AjaxResult get(@ApiParam("队列名") String queueName) { + PriorityDemo data; + do { + data = QueueUtils.getPriorityQueueObject(queueName); + log.info("通道: {} , 获取数据: {}", queueName, data); + } while (data != null); + return AjaxResult.success("操作成功"); + } + +}