【新增】 rabbitmq手动连接,推送数据,接收数据并保存在数据库内

This commit is contained in:
DESKTOP-932OMT8\REN
2025-05-12 13:18:10 +08:00
parent 6f4368c8f7
commit f118a2a0b6
11 changed files with 207 additions and 41 deletions

View File

@@ -0,0 +1,27 @@
package pc.exam.pp.framework.mybatis.core.dataobject;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* @author REN
*/
public class CustomLocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
@Override
public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, IOException {
// 读取时间戳(毫秒)
long timestamp = p.getLongValue();
return LocalDateTime.ofInstant(
Instant.ofEpochMilli(timestamp),
// 使用系统默认时区
ZoneId.systemDefault()
);
}
}

View File

@@ -90,6 +90,17 @@ public class SecurityFrameworkUtils {
return loginUser != null ? loginUser.getId() : null; return loginUser != null ? loginUser.getId() : null;
} }
/**
* 获得当前用户的租户ID从上下文中
*
* @return 租户ID
*/
@Nullable
public static Long getLoginTenantId() {
LoginUser loginUser = getLoginUser();
return loginUser != null ? loginUser.getTenantId() : null;
}
/** /**
* 获得当前用户的昵称,从上下文中 * 获得当前用户的昵称,从上下文中
* *

View File

@@ -17,6 +17,11 @@
</description> </description>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.18</version>
</dependency>
<dependency> <dependency>
<groupId>pc.exam.gg</groupId> <groupId>pc.exam.gg</groupId>
<artifactId>exam-module-system-api</artifactId> <artifactId>exam-module-system-api</artifactId>

View File

@@ -0,0 +1,71 @@
package pc.exam.pp.module.exam.controller.admin.rabbitmq;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import pc.exam.pp.module.exam.controller.admin.rabbitmq.vo.RabbitMQSendInfoVO;
import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion;
import pc.exam.pp.module.exam.service.question.ExamQuestionServiceImpl;
import pc.exam.pp.module.exam.utils.rabbitmq.RabbitmqUtils;
import java.util.List;
@RestController
@RequestMapping("/rabbitmq")
@Tag( name = "rabbitmq 消息队列")
public class RabbitMQController {
@Resource
private RabbitmqUtils rabbitMqService;
@Resource
private ExamQuestionServiceImpl examQuestionService;
/**
* 连接Rabbitmq
* @param queueName 队列名称
* @return 连接状态
*/
@PostMapping("/connect")
public String connect(@RequestParam String queueName) {
return rabbitMqService.connect(queueName,"name");
}
/**
* 判断是否连接
* @return 连接状态
*/
@GetMapping("/isConnected")
public boolean isConnected() {
return rabbitMqService.isConnected();
}
/**
* 发送消息
* @param rabbitMQSendInfoVO 对象
* @return 发送状态
*/
@PostMapping("/send")
public int sendMessage(@RequestBody RabbitMQSendInfoVO rabbitMQSendInfoVO) {
return examQuestionService.uploadExamQuestionToRabbitMQ(rabbitMQSendInfoVO);
}
/**
* 全部接收消息
* @param queueName 队列名称-客户端代码
* @return 接收结果
*/
@GetMapping("/receiveAll")
public List<ExamQuestion> receiveMessage(@RequestParam String queueName) {
return examQuestionService.getExamQuestionToRabbitMQ(queueName);
}
/**
* 关闭
* @return 关闭结果
*/
@GetMapping("/disconnect")
public String disconnect() {
rabbitMqService.disconnect();
return "Disconnected";
}
}

View File

@@ -0,0 +1,15 @@
package pc.exam.pp.module.exam.controller.admin.rabbitmq.vo;
import lombok.Data;
import java.util.List;
/**
* @author REN
*/
@Data
public class RabbitMQSendInfoVO {
private String queueName;
private String type;
private List<String> quIds;
}

View File

@@ -109,10 +109,14 @@ public class ExamQuestion extends TenantBaseDO {
// @Excel(name = "试题关键字") // @Excel(name = "试题关键字")
@TableField(exist = false) @TableField(exist = false)
private List<ExamQuestionKeyword> questionKeywords; private List<ExamQuestionKeyword> questionKeywords;
@TableField(exist = false)
private Long source;
@TableField(exist = false)
private String createTeacher;
@TableField(exist = false)
private String type;
} }

View File

@@ -6,6 +6,7 @@ import org.springframework.stereotype.Service;
import pc.exam.pp.framework.common.pojo.PageResult; import pc.exam.pp.framework.common.pojo.PageResult;
import pc.exam.pp.module.exam.controller.admin.question.dto.ExamQuestionDto; import pc.exam.pp.module.exam.controller.admin.question.dto.ExamQuestionDto;
import pc.exam.pp.module.exam.controller.admin.question.vo.QuestionVo; import pc.exam.pp.module.exam.controller.admin.question.vo.QuestionVo;
import pc.exam.pp.module.exam.controller.admin.rabbitmq.vo.RabbitMQSendInfoVO;
import pc.exam.pp.module.exam.dal.dataobject.*; import pc.exam.pp.module.exam.dal.dataobject.*;
import pc.exam.pp.module.exam.dal.mysql.question.*; import pc.exam.pp.module.exam.dal.mysql.question.*;
import pc.exam.pp.module.exam.utils.date.DateUtils; import pc.exam.pp.module.exam.utils.date.DateUtils;
@@ -81,6 +82,7 @@ public class ExamQuestionServiceImpl implements IExamQuestionService
* @return 试题(hyc) * @return 试题(hyc)
*/ */
/** /**
* 新增试题(hyc) * 新增试题(hyc)
* *
@@ -333,29 +335,38 @@ public class ExamQuestionServiceImpl implements IExamQuestionService
/** /**
* 上传试题至Rabbitmq * 上传试题至Rabbitmq
* @param quId 试题内容 ID * @param rabbitMQSendInfoVO 试题内容
* @param QUEUE_NAME 客户端ID
* @return 结果 * @return 结果
*/ */
@Override @Override
public String uploadExamQuestionToRabbitMQ(String quId, String QUEUE_NAME) { public int uploadExamQuestionToRabbitMQ(RabbitMQSendInfoVO rabbitMQSendInfoVO) {
// 1、判断上传题目数量
// 根据试题ID查找试题详情 int quCount = rabbitMQSendInfoVO.getQuIds().size();
ExamQuestion examQuestion_obj = selectExamQuestionByQuId(quId); for (String quId : rabbitMQSendInfoVO.getQuIds()) {
// 2、根据试题ID查找试题详情
// TODO 缺失逻辑 ExamQuestion examQuestion_obj = selectExamQuestionByQuId(quId);
examQuestion_obj.setType(rabbitMQSendInfoVO.getType());
return rabbitMqService.sendMessage(QUEUE_NAME, examQuestion_obj); // 3、上传至Rabbitmq
rabbitMqService.sendMessage(rabbitMQSendInfoVO.getQueueName(), examQuestion_obj);
}
return quCount;
} }
/** /**
* 拉取数据Rabbitmq * 拉取数据Rabbitmq
* @param QUEUE_NAME 客户端ID * @param queueName 客户端ID
* @return 试题内容 * @return 试题内容
*/ */
@Override @Override
public List<ExamQuestion> getExamQuestionToRabbitMQ(String QUEUE_NAME) { public List<ExamQuestion> getExamQuestionToRabbitMQ(String queueName) {
// 最先判断类型
// TODO 1、拉取数据保存至数据库 2、回调服务器是否拉取成功中心服务器 // TODO 1、拉取数据保存至数据库 2、回调服务器是否拉取成功中心服务器
return rabbitMqService.receiveAllMessages(QUEUE_NAME); // 1、获取上传的rabbit的试题数组
List<ExamQuestion> examQuestions = rabbitMqService.receiveAllMessages(queueName);
// 2、保存至数据库
for (ExamQuestion examQuestion : examQuestions) {
insertExamQuestion(examQuestion);
}
return rabbitMqService.receiveAllMessages(queueName);
} }
} }

View File

@@ -3,6 +3,7 @@ package pc.exam.pp.module.exam.service.question;
import pc.exam.pp.framework.common.pojo.PageResult; import pc.exam.pp.framework.common.pojo.PageResult;
import pc.exam.pp.module.exam.controller.admin.question.dto.ExamQuestionDto; import pc.exam.pp.module.exam.controller.admin.question.dto.ExamQuestionDto;
import pc.exam.pp.module.exam.controller.admin.question.vo.QuestionVo; import pc.exam.pp.module.exam.controller.admin.question.vo.QuestionVo;
import pc.exam.pp.module.exam.controller.admin.rabbitmq.vo.RabbitMQSendInfoVO;
import pc.exam.pp.module.exam.dal.dataobject.EducationPaperTask; import pc.exam.pp.module.exam.dal.dataobject.EducationPaperTask;
import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion; import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion;
@@ -67,16 +68,15 @@ public interface IExamQuestionService
/** /**
* 上传试题至Rabbitmq * 上传试题至Rabbitmq
* @param quId 试题内容 ID * @param rabbitMQSendInfoVO 试题内容
* @param QUEUE_NAME 客户端ID
* @return 结果 * @return 结果
*/ */
public String uploadExamQuestionToRabbitMQ(String quId, String QUEUE_NAME); public int uploadExamQuestionToRabbitMQ(RabbitMQSendInfoVO rabbitMQSendInfoVO);
/** /**
* 拉取Rabbitmq内容 * 拉取Rabbitmq内容
* @param QUEUE_NAME 客户端ID * @param queueName 客户端ID
* @return 结果 * @return 结果
*/ */
public List<ExamQuestion> getExamQuestionToRabbitMQ(String QUEUE_NAME); public List<ExamQuestion> getExamQuestionToRabbitMQ(String queueName);
} }

View File

@@ -1,6 +1,7 @@
package pc.exam.pp.module.exam.utils.rabbitmq; package pc.exam.pp.module.exam.utils.rabbitmq;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
@@ -8,9 +9,8 @@ import com.rabbitmq.client.GetResponse;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import pc.exam.pp.framework.common.util.json.JsonUtils; import pc.exam.pp.framework.security.core.util.SecurityFrameworkUtils;
import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion; import pc.exam.pp.module.exam.dal.dataobject.ExamQuestion;
import pc.exam.pp.module.exam.utils.json.JsonUtil ;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@@ -44,11 +44,11 @@ public class RabbitmqUtils {
/** /**
* *
* @param QUEUE_NAME 客户端对应队列 * @param queueName 客户端对应队列
* @param deptName 客户端名称 * @param deptName 客户端名称
* @return 客户端连接状态 * @return 客户端连接状态
*/ */
public String connect(String QUEUE_NAME, String deptName) { public String connect(String queueName, String deptName) {
if (isConnected()) { if (isConnected()) {
return "已经连接"; return "已经连接";
} }
@@ -65,7 +65,7 @@ public class RabbitmqUtils {
connection = factory.newConnection(); connection = factory.newConnection();
channel = connection.createChannel(); channel = connection.createChannel();
// 声明一个队列,如果已经存在就不会重复创建 // 声明一个队列,如果已经存在就不会重复创建
channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueDeclare(queueName, true, false, false, null);
return deptName + "_连接成功"; return deptName + "_连接成功";
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@@ -77,12 +77,17 @@ public class RabbitmqUtils {
* 向队列中发送一条消息 * 向队列中发送一条消息
*/ */
public String sendMessage(String QUEUE_NAME, ExamQuestion message) { public String sendMessage(String queueName, ExamQuestion message) {
try { try {
if (!isConnected()) return "未连接"; if (!isConnected()) {
return "未连接";
}
// 获取当前的租户ID
Long tenantId = SecurityFrameworkUtils.getLoginTenantId();
message.setSource(tenantId);
// 发送消息到默认交换机,绑定到队列 // 发送消息到默认交换机,绑定到队列
channel.basicPublish("", QUEUE_NAME, null, JsonUtil.toJson(message)); channel.basicPublish("", queueName, null, JSONUtil.toJsonStr(message).getBytes());
return "消息发送成功:" + QUEUE_NAME; return "消息发送成功:" + queueName;
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
return "发送失败: " + e.getMessage(); return "发送失败: " + e.getMessage();
@@ -91,14 +96,16 @@ public class RabbitmqUtils {
/** /**
* 只接收一条消息(点击一次,拉取一次) * 只接收一条消息(点击一次,拉取一次)
* @param QUEUE_NAME 客户端对应队列 * @param queueName 客户端对应队列
* @return 消息 * @return 消息
*/ */
public String receiveMessageOnce(String QUEUE_NAME) { public String receiveMessageOnce(String queueName) {
try { try {
if (!isConnected()) return "未连接"; if (!isConnected()) {
return "未连接";
}
// 拉取一条消息(非阻塞),自动确认 // 拉取一条消息(非阻塞),自动确认
GetResponse response = channel.basicGet(QUEUE_NAME, true); GetResponse response = channel.basicGet(queueName, true);
if (response != null) { if (response != null) {
return new String(response.getBody()); return new String(response.getBody());
} else { } else {
@@ -113,16 +120,22 @@ public class RabbitmqUtils {
/** /**
* 一次性拉取所有消息(直到队列为空) * 一次性拉取所有消息(直到队列为空)
*/ */
public List<ExamQuestion> receiveAllMessages(String QUEUE_NAME) { public List<ExamQuestion> receiveAllMessages(String queueName) {
List<ExamQuestion> messages = new ArrayList<>(); List<ExamQuestion> messages = new ArrayList<>();
try { try {
if (!isConnected()) return messages; if (!isConnected()) {
return messages;
}
GetResponse response; GetResponse response;
// 循环拉取,直到没有消息为止 // 循环拉取,直到没有消息为止
while ((response = channel.basicGet(QUEUE_NAME, true)) != null) { while ((response = channel.basicGet(queueName, true)) != null) {
messages.add(JsonUtil.fromJson(response.getBody(), ExamQuestion.class)); byte[] body = response.getBody();
// 使用系统默认编码(可能因环境不同导致乱码)
String str = new String(body);
messages.add(JSONUtil.toBean(str, ExamQuestion.class));
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@@ -134,8 +147,12 @@ public class RabbitmqUtils {
*/ */
public void disconnect() { public void disconnect() {
try { try {
if (channel != null) channel.close(); if (channel != null) {
if (connection != null) connection.close(); channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@@ -20,6 +20,8 @@ public interface TenantConvert {
reqVO.setUsername(bean.getUsername()); reqVO.setUsername(bean.getUsername());
reqVO.setPassword(bean.getPassword()); reqVO.setPassword(bean.getPassword());
reqVO.setNickname(bean.getContactName()).setMobile(bean.getContactMobile()); reqVO.setNickname(bean.getContactName()).setMobile(bean.getContactMobile());
// 租户创建的用户,默认是管理员用户
reqVO.setUserType("0");
return reqVO; return reqVO;
} }

View File

@@ -4,6 +4,8 @@ import pc.exam.pp.framework.common.core.KeyValue;
import lombok.Data; import lombok.Data;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
@@ -12,9 +14,10 @@ import java.util.List;
* @author 芋道源码 * @author 芋道源码
*/ */
@Data @Data
public class SmsSendMessage { public class SmsSendMessage implements Serializable {
public static final String QUEUE = "SEND_MESSAGE_QUEUE"; // 重点:需要增加消息对应的 Queue // 重点:需要增加消息对应的 Queue
public static final String QUEUE = "SEND_MESSAGE_QUEUE";
/** /**
* 短信日志编号 * 短信日志编号