From f118a2a0b6d05a4e2bf46601c02cdc8af1cc63ae Mon Sep 17 00:00:00 2001 From: "DESKTOP-932OMT8\\REN" Date: Mon, 12 May 2025 13:18:10 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E6=96=B0=E5=A2=9E=E3=80=91=20rabbitmq?= =?UTF-8?q?=E6=89=8B=E5=8A=A8=E8=BF=9E=E6=8E=A5=EF=BC=8C=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=EF=BC=8C=E6=8E=A5=E6=94=B6=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=B9=B6=E4=BF=9D=E5=AD=98=E5=9C=A8=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E5=86=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CustomLocalDateTimeDeserializer.java | 27 +++++++ .../core/util/SecurityFrameworkUtils.java | 11 +++ exam-module-exam/exam-module-exam-biz/pom.xml | 5 ++ .../admin/rabbitmq/RabbitMQController.java | 71 +++++++++++++++++++ .../admin/rabbitmq/vo/RabbitMQSendInfoVO.java | 15 ++++ .../exam/dal/dataobject/ExamQuestion.java | 8 ++- .../question/ExamQuestionServiceImpl.java | 37 ++++++---- .../question/IExamQuestionService.java | 10 +-- .../exam/utils/rabbitmq/RabbitmqUtils.java | 55 +++++++++----- .../system/convert/tenant/TenantConvert.java | 2 + .../system/mq/message/sms/SmsSendMessage.java | 7 +- 11 files changed, 207 insertions(+), 41 deletions(-) create mode 100644 exam-framework/exam-spring-boot-starter-mybatis/src/main/java/pc/exam/pp/framework/mybatis/core/dataobject/CustomLocalDateTimeDeserializer.java create mode 100644 exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/RabbitMQController.java create mode 100644 exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/vo/RabbitMQSendInfoVO.java diff --git a/exam-framework/exam-spring-boot-starter-mybatis/src/main/java/pc/exam/pp/framework/mybatis/core/dataobject/CustomLocalDateTimeDeserializer.java b/exam-framework/exam-spring-boot-starter-mybatis/src/main/java/pc/exam/pp/framework/mybatis/core/dataobject/CustomLocalDateTimeDeserializer.java new file mode 100644 index 00000000..759eaf3c --- /dev/null +++ b/exam-framework/exam-spring-boot-starter-mybatis/src/main/java/pc/exam/pp/framework/mybatis/core/dataobject/CustomLocalDateTimeDeserializer.java @@ -0,0 +1,27 @@ +package pc.exam.pp.framework.mybatis.core.dataobject; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +/** + * @author REN + */ +public class CustomLocalDateTimeDeserializer extends JsonDeserializer { + @Override + public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, IOException { + // 读取时间戳(毫秒) + long timestamp = p.getLongValue(); + return LocalDateTime.ofInstant( + Instant.ofEpochMilli(timestamp), + // 使用系统默认时区 + ZoneId.systemDefault() + ); + } +} \ No newline at end of file diff --git a/exam-framework/exam-spring-boot-starter-security/src/main/java/pc/exam/pp/framework/security/core/util/SecurityFrameworkUtils.java b/exam-framework/exam-spring-boot-starter-security/src/main/java/pc/exam/pp/framework/security/core/util/SecurityFrameworkUtils.java index 21f65498..65b59d27 100644 --- a/exam-framework/exam-spring-boot-starter-security/src/main/java/pc/exam/pp/framework/security/core/util/SecurityFrameworkUtils.java +++ b/exam-framework/exam-spring-boot-starter-security/src/main/java/pc/exam/pp/framework/security/core/util/SecurityFrameworkUtils.java @@ -90,6 +90,17 @@ public class SecurityFrameworkUtils { return loginUser != null ? loginUser.getId() : null; } + /** + * 获得当前用户的租户ID,从上下文中 + * + * @return 租户ID + */ + @Nullable + public static Long getLoginTenantId() { + LoginUser loginUser = getLoginUser(); + return loginUser != null ? loginUser.getTenantId() : null; + } + /** * 获得当前用户的昵称,从上下文中 * diff --git a/exam-module-exam/exam-module-exam-biz/pom.xml b/exam-module-exam/exam-module-exam-biz/pom.xml index acd1d07e..9aa338e5 100644 --- a/exam-module-exam/exam-module-exam-biz/pom.xml +++ b/exam-module-exam/exam-module-exam-biz/pom.xml @@ -17,6 +17,11 @@ + + org.springframework.boot + spring-boot-starter-amqp + 2.7.18 + pc.exam.gg exam-module-system-api diff --git a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/RabbitMQController.java b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/RabbitMQController.java new file mode 100644 index 00000000..6699be5b --- /dev/null +++ b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/RabbitMQController.java @@ -0,0 +1,71 @@ +package pc.exam.pp.module.exam.controller.admin.rabbitmq; + +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; +import pc.exam.pp.module.exam.controller.admin.rabbitmq.vo.RabbitMQSendInfoVO; +import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion; +import pc.exam.pp.module.exam.service.question.ExamQuestionServiceImpl; +import pc.exam.pp.module.exam.utils.rabbitmq.RabbitmqUtils; + +import java.util.List; + +@RestController +@RequestMapping("/rabbitmq") +@Tag( name = "rabbitmq 消息队列") +public class RabbitMQController { + + @Resource + private RabbitmqUtils rabbitMqService; + @Resource + private ExamQuestionServiceImpl examQuestionService; + /** + * 连接Rabbitmq + * @param queueName 队列名称 + * @return 连接状态 + */ + @PostMapping("/connect") + public String connect(@RequestParam String queueName) { + return rabbitMqService.connect(queueName,"name"); + } + + /** + * 判断是否连接 + * @return 连接状态 + */ + @GetMapping("/isConnected") + public boolean isConnected() { + return rabbitMqService.isConnected(); + } + + /** + * 发送消息 + * @param rabbitMQSendInfoVO 对象 + * @return 发送状态 + */ + @PostMapping("/send") + public int sendMessage(@RequestBody RabbitMQSendInfoVO rabbitMQSendInfoVO) { + return examQuestionService.uploadExamQuestionToRabbitMQ(rabbitMQSendInfoVO); + } + + /** + * 全部接收消息 + * @param queueName 队列名称-客户端代码 + * @return 接收结果 + */ + @GetMapping("/receiveAll") + public List receiveMessage(@RequestParam String queueName) { + return examQuestionService.getExamQuestionToRabbitMQ(queueName); + } + + /** + * 关闭 + * @return 关闭结果 + */ + @GetMapping("/disconnect") + public String disconnect() { + rabbitMqService.disconnect(); + return "Disconnected"; + } +} \ No newline at end of file diff --git a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/vo/RabbitMQSendInfoVO.java b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/vo/RabbitMQSendInfoVO.java new file mode 100644 index 00000000..d5d635fc --- /dev/null +++ b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/controller/admin/rabbitmq/vo/RabbitMQSendInfoVO.java @@ -0,0 +1,15 @@ +package pc.exam.pp.module.exam.controller.admin.rabbitmq.vo; + +import lombok.Data; + +import java.util.List; + +/** + * @author REN + */ +@Data +public class RabbitMQSendInfoVO { + private String queueName; + private String type; + private List quIds; +} diff --git a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/dal/dataobject/ExamQuestion.java b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/dal/dataobject/ExamQuestion.java index ec7d144a..6c5dfadd 100644 --- a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/dal/dataobject/ExamQuestion.java +++ b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/dal/dataobject/ExamQuestion.java @@ -109,10 +109,14 @@ public class ExamQuestion extends TenantBaseDO { // @Excel(name = "试题关键字") @TableField(exist = false) private List questionKeywords; - - + @TableField(exist = false) + private Long source; + @TableField(exist = false) + private String createTeacher; + @TableField(exist = false) + private String type; } diff --git a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/ExamQuestionServiceImpl.java b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/ExamQuestionServiceImpl.java index 0cd8dda0..1a5a5da6 100644 --- a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/ExamQuestionServiceImpl.java +++ b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/ExamQuestionServiceImpl.java @@ -6,6 +6,7 @@ import org.springframework.stereotype.Service; import pc.exam.pp.framework.common.pojo.PageResult; import pc.exam.pp.module.exam.controller.admin.question.dto.ExamQuestionDto; import pc.exam.pp.module.exam.controller.admin.question.vo.QuestionVo; +import pc.exam.pp.module.exam.controller.admin.rabbitmq.vo.RabbitMQSendInfoVO; import pc.exam.pp.module.exam.dal.dataobject.*; import pc.exam.pp.module.exam.dal.mysql.question.*; import pc.exam.pp.module.exam.utils.date.DateUtils; @@ -81,6 +82,7 @@ public class ExamQuestionServiceImpl implements IExamQuestionService * @return 试题(hyc) */ + /** * 新增试题(hyc) * @@ -333,29 +335,38 @@ public class ExamQuestionServiceImpl implements IExamQuestionService /** * 上传试题至Rabbitmq - * @param quId 试题内容 ID - * @param QUEUE_NAME 客户端ID + * @param rabbitMQSendInfoVO 试题内容 * @return 结果 */ @Override - public String uploadExamQuestionToRabbitMQ(String quId, String QUEUE_NAME) { - - // 根据试题ID查找试题详情 - ExamQuestion examQuestion_obj = selectExamQuestionByQuId(quId); - - // TODO 缺失逻辑 - - return rabbitMqService.sendMessage(QUEUE_NAME, examQuestion_obj); + public int uploadExamQuestionToRabbitMQ(RabbitMQSendInfoVO rabbitMQSendInfoVO) { + // 1、判断上传题目数量 + int quCount = rabbitMQSendInfoVO.getQuIds().size(); + for (String quId : rabbitMQSendInfoVO.getQuIds()) { + // 2、根据试题ID查找试题详情 + ExamQuestion examQuestion_obj = selectExamQuestionByQuId(quId); + examQuestion_obj.setType(rabbitMQSendInfoVO.getType()); + // 3、上传至Rabbitmq + rabbitMqService.sendMessage(rabbitMQSendInfoVO.getQueueName(), examQuestion_obj); + } + return quCount; } /** * 拉取数据Rabbitmq - * @param QUEUE_NAME 客户端ID + * @param queueName 客户端ID * @return 试题内容 */ @Override - public List getExamQuestionToRabbitMQ(String QUEUE_NAME) { + public List getExamQuestionToRabbitMQ(String queueName) { + // 最先判断类型 // TODO 1、拉取数据,保存至数据库 2、回调服务器是否拉取成功(中心服务器) - return rabbitMqService.receiveAllMessages(QUEUE_NAME); + // 1、获取上传的rabbit的试题数组 + List examQuestions = rabbitMqService.receiveAllMessages(queueName); + // 2、保存至数据库 + for (ExamQuestion examQuestion : examQuestions) { + insertExamQuestion(examQuestion); + } + return rabbitMqService.receiveAllMessages(queueName); } } diff --git a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/IExamQuestionService.java b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/IExamQuestionService.java index 8561155a..a5166254 100644 --- a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/IExamQuestionService.java +++ b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/service/question/IExamQuestionService.java @@ -3,6 +3,7 @@ package pc.exam.pp.module.exam.service.question; import pc.exam.pp.framework.common.pojo.PageResult; import pc.exam.pp.module.exam.controller.admin.question.dto.ExamQuestionDto; import pc.exam.pp.module.exam.controller.admin.question.vo.QuestionVo; +import pc.exam.pp.module.exam.controller.admin.rabbitmq.vo.RabbitMQSendInfoVO; import pc.exam.pp.module.exam.dal.dataobject.EducationPaperTask; import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion; @@ -67,16 +68,15 @@ public interface IExamQuestionService /** * 上传试题至Rabbitmq - * @param quId 试题内容 ID - * @param QUEUE_NAME 客户端ID + * @param rabbitMQSendInfoVO 试题内容 * @return 结果 */ - public String uploadExamQuestionToRabbitMQ(String quId, String QUEUE_NAME); + public int uploadExamQuestionToRabbitMQ(RabbitMQSendInfoVO rabbitMQSendInfoVO); /** * 拉取Rabbitmq内容 - * @param QUEUE_NAME 客户端ID + * @param queueName 客户端ID * @return 结果 */ - public List getExamQuestionToRabbitMQ(String QUEUE_NAME); + public List getExamQuestionToRabbitMQ(String queueName); } diff --git a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/utils/rabbitmq/RabbitmqUtils.java b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/utils/rabbitmq/RabbitmqUtils.java index 8b75ed74..897166fe 100644 --- a/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/utils/rabbitmq/RabbitmqUtils.java +++ b/exam-module-exam/exam-module-exam-biz/src/main/java/pc/exam/pp/module/exam/utils/rabbitmq/RabbitmqUtils.java @@ -1,6 +1,7 @@ package pc.exam.pp.module.exam.utils.rabbitmq; +import cn.hutool.json.JSONUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -8,9 +9,8 @@ import com.rabbitmq.client.GetResponse; import org.springframework.stereotype.Service; -import pc.exam.pp.framework.common.util.json.JsonUtils; +import pc.exam.pp.framework.security.core.util.SecurityFrameworkUtils; import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion; -import pc.exam.pp.module.exam.utils.json.JsonUtil ; import java.io.IOException; import java.util.ArrayList; @@ -44,11 +44,11 @@ public class RabbitmqUtils { /** * - * @param QUEUE_NAME 客户端对应队列 + * @param queueName 客户端对应队列 * @param deptName 客户端名称 * @return 客户端连接状态 */ - public String connect(String QUEUE_NAME, String deptName) { + public String connect(String queueName, String deptName) { if (isConnected()) { return "已经连接"; } @@ -65,7 +65,7 @@ public class RabbitmqUtils { connection = factory.newConnection(); channel = connection.createChannel(); // 声明一个队列,如果已经存在就不会重复创建 - channel.queueDeclare(QUEUE_NAME, true, false, false, null); + channel.queueDeclare(queueName, true, false, false, null); return deptName + "_连接成功"; } catch (Exception e) { e.printStackTrace(); @@ -77,12 +77,17 @@ public class RabbitmqUtils { * 向队列中发送一条消息 */ - public String sendMessage(String QUEUE_NAME, ExamQuestion message) { + public String sendMessage(String queueName, ExamQuestion message) { try { - if (!isConnected()) return "未连接"; + if (!isConnected()) { + return "未连接"; + } + // 获取当前的租户ID + Long tenantId = SecurityFrameworkUtils.getLoginTenantId(); + message.setSource(tenantId); // 发送消息到默认交换机,绑定到队列 - channel.basicPublish("", QUEUE_NAME, null, JsonUtil.toJson(message)); - return "消息发送成功:" + QUEUE_NAME; + channel.basicPublish("", queueName, null, JSONUtil.toJsonStr(message).getBytes()); + return "消息发送成功:" + queueName; } catch (IOException e) { e.printStackTrace(); return "发送失败: " + e.getMessage(); @@ -91,14 +96,16 @@ public class RabbitmqUtils { /** * 只接收一条消息(点击一次,拉取一次) - * @param QUEUE_NAME 客户端对应队列 + * @param queueName 客户端对应队列 * @return 消息 */ - public String receiveMessageOnce(String QUEUE_NAME) { + public String receiveMessageOnce(String queueName) { try { - if (!isConnected()) return "未连接"; + if (!isConnected()) { + return "未连接"; + } // 拉取一条消息(非阻塞),自动确认 - GetResponse response = channel.basicGet(QUEUE_NAME, true); + GetResponse response = channel.basicGet(queueName, true); if (response != null) { return new String(response.getBody()); } else { @@ -113,16 +120,22 @@ public class RabbitmqUtils { /** * 一次性拉取所有消息(直到队列为空) */ - public List receiveAllMessages(String QUEUE_NAME) { + public List receiveAllMessages(String queueName) { List messages = new ArrayList<>(); try { - if (!isConnected()) return messages; + if (!isConnected()) { + return messages; + } GetResponse response; // 循环拉取,直到没有消息为止 - while ((response = channel.basicGet(QUEUE_NAME, true)) != null) { - messages.add(JsonUtil.fromJson(response.getBody(), ExamQuestion.class)); + while ((response = channel.basicGet(queueName, true)) != null) { + byte[] body = response.getBody(); + // 使用系统默认编码(可能因环境不同导致乱码) + String str = new String(body); + messages.add(JSONUtil.toBean(str, ExamQuestion.class)); } + } catch (IOException e) { e.printStackTrace(); } @@ -134,8 +147,12 @@ public class RabbitmqUtils { */ public void disconnect() { try { - if (channel != null) channel.close(); - if (connection != null) connection.close(); + if (channel != null) { + channel.close(); + } + if (connection != null) { + connection.close(); + } } catch (Exception e) { e.printStackTrace(); } diff --git a/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/convert/tenant/TenantConvert.java b/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/convert/tenant/TenantConvert.java index 53320c38..84b7da6d 100644 --- a/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/convert/tenant/TenantConvert.java +++ b/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/convert/tenant/TenantConvert.java @@ -20,6 +20,8 @@ public interface TenantConvert { reqVO.setUsername(bean.getUsername()); reqVO.setPassword(bean.getPassword()); reqVO.setNickname(bean.getContactName()).setMobile(bean.getContactMobile()); + // 租户创建的用户,默认是管理员用户 + reqVO.setUserType("0"); return reqVO; } diff --git a/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/mq/message/sms/SmsSendMessage.java b/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/mq/message/sms/SmsSendMessage.java index 1c37fed6..7f50f4b1 100644 --- a/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/mq/message/sms/SmsSendMessage.java +++ b/exam-module-system/exam-module-system-biz/src/main/java/pc/exam/pp/module/system/mq/message/sms/SmsSendMessage.java @@ -4,6 +4,8 @@ import pc.exam.pp.framework.common.core.KeyValue; import lombok.Data; import jakarta.validation.constraints.NotNull; + +import java.io.Serializable; import java.util.List; /** @@ -12,9 +14,10 @@ import java.util.List; * @author 芋道源码 */ @Data -public class SmsSendMessage { +public class SmsSendMessage implements Serializable { - public static final String QUEUE = "SEND_MESSAGE_QUEUE"; // 重点:需要增加消息对应的 Queue + // 重点:需要增加消息对应的 Queue + public static final String QUEUE = "SEND_MESSAGE_QUEUE"; /** * 短信日志编号