|
|
@@ -1,196 +0,0 @@
|
|
|
-package com.kym.service.aliyun.lot;
|
|
|
-
|
|
|
-import cn.hutool.core.util.CharsetUtil;
|
|
|
-import com.alibaba.fastjson2.JSONObject;
|
|
|
-import com.alibaba.fastjson2.TypeReference;
|
|
|
-import com.aliyun.mns.client.CloudAccount;
|
|
|
-import com.aliyun.mns.client.CloudQueue;
|
|
|
-import com.aliyun.mns.client.MNSClient;
|
|
|
-import com.aliyun.mns.model.Message;
|
|
|
-import com.kym.entity.awoara.Event;
|
|
|
-import com.kym.entity.awoara.MessageBody;
|
|
|
-import com.kym.service.awoara.factory.AwoaraEventHandlerFactory;
|
|
|
-import jakarta.annotation.PreDestroy;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.commons.text.StringEscapeUtils;
|
|
|
-import org.springframework.context.event.ContextRefreshedEvent;
|
|
|
-import org.springframework.context.event.EventListener;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.util.Base64;
|
|
|
-import java.util.List;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-/**
|
|
|
- * MNS消息处理
|
|
|
- *
|
|
|
- * @author skyline
|
|
|
- */
|
|
|
-//@Component 备注:弃用MNS改用AMQP
|
|
|
-@Slf4j
|
|
|
-public class MnsHandler {
|
|
|
-
|
|
|
- /**
|
|
|
- * 使用 Java 21 虚拟线程执行器(推荐)
|
|
|
- * 虚拟线程非常轻量,适合I/O密集型的消息处理
|
|
|
- */
|
|
|
- private final static ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
|
|
|
-
|
|
|
- /**
|
|
|
- * 传统线程池(备用)
|
|
|
- */
|
|
|
-// private final static ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
-// 2, 4, 60, TimeUnit.SECONDS,
|
|
|
-// new LinkedBlockingQueue<>(10000));
|
|
|
-
|
|
|
- // 增加饱和策略(建议用CallerRunsPolicy)
|
|
|
-// private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
|
|
|
-// 4, 8, 60, TimeUnit.SECONDS,
|
|
|
-// new LinkedBlockingQueue<>(10000),
|
|
|
-// new ThreadPoolExecutor.CallerRunsPolicy()); // 关键修改点
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 是否做 base64 编码
|
|
|
- */
|
|
|
- private static final Boolean IS_BASE64 = false;
|
|
|
- /**
|
|
|
- * BASE64解码器
|
|
|
- */
|
|
|
- private static Base64.Decoder decoder = Base64.getDecoder();
|
|
|
- String queueName = "aliyun-iot-k1olfsszoYB";
|
|
|
- // 遵循阿里云规范,env设置ak、sk。
|
|
|
- CloudAccount account = new CloudAccount("LTAI5tNhD2KFuLUN1hMEukmS", "dtVF6na8Hp9W8DmAoWI9k24VXwjNyM", "http://1757940634296846.mns.cn-shanghai.aliyuncs.com");
|
|
|
- //this client need only initialize once
|
|
|
- MNSClient client = account.getMNSClient();
|
|
|
- CloudQueue queue = client.getQueueRef(queueName);
|
|
|
-
|
|
|
-
|
|
|
- private static void batchReceive(CloudQueue queue) {
|
|
|
- executorService.execute(() -> {
|
|
|
- while (!Thread.currentThread().isInterrupted()) {
|
|
|
- try {
|
|
|
- longPollingBatchReceive(queue);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("消息处理异常", e);
|
|
|
- // 增加恢复逻辑(如休眠后重试)
|
|
|
- try {
|
|
|
- TimeUnit.SECONDS.sleep(5);
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- break;
|
|
|
- }
|
|
|
- } finally {
|
|
|
- // 虚拟线程执行器不支持传统线程池的监控方法
|
|
|
- // 虚拟线程是轻量级的,无需关注池大小和队列
|
|
|
- log.debug("虚拟线程执行器正在处理消息...");
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 长轮询批量获取消息
|
|
|
- *
|
|
|
- * @param queue
|
|
|
- */
|
|
|
- private static void longPollingBatchReceive(CloudQueue queue) {
|
|
|
- log.info("=============开始 长轮询批量获取消息=============");
|
|
|
-
|
|
|
- // 一次性拉取最多xx条消息
|
|
|
- int batchSize = 15;
|
|
|
- // 长轮询时间为 xx s
|
|
|
- int waitSeconds = 15;
|
|
|
-
|
|
|
- List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
|
|
|
- if (messages != null && !messages.isEmpty()) {
|
|
|
- for (Message message : messages) {
|
|
|
- printMsgAndDelete(queue, message);
|
|
|
- }
|
|
|
- }
|
|
|
- log.info("=============结束 长轮询批量获取消息=============");
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取单条消息
|
|
|
- *
|
|
|
- * @param queue
|
|
|
- */
|
|
|
- private static void singleReceive(CloudQueue queue) {
|
|
|
- log.info("=============start singleReceive=============");
|
|
|
-
|
|
|
- Message popMsg = queue.popMessage();
|
|
|
- printMsgAndDelete(queue, popMsg);
|
|
|
-
|
|
|
- log.info("=============end singleReceive=============");
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 删除队列消息
|
|
|
- *
|
|
|
- * @param queue
|
|
|
- * @param popMsg
|
|
|
- */
|
|
|
- private static void printMsgAndDelete(CloudQueue queue, Message popMsg) {
|
|
|
- if (popMsg != null) {
|
|
|
- log.info("message handle: " + popMsg.getReceiptHandle());
|
|
|
- log.info("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
|
|
|
- log.info("message id: " + popMsg.getMessageId());
|
|
|
- log.info("message dequeue count:" + popMsg.getDequeueCount());
|
|
|
-
|
|
|
- try {
|
|
|
- handleMessage(popMsg);
|
|
|
- queue.deleteMessage(popMsg.getReceiptHandle());
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Failed to process message: " + popMsg.getMessageId(), e);
|
|
|
- }
|
|
|
- log.info("delete message successfully.");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- static void handleMessage(Message popMsg) {
|
|
|
- var base64 = popMsg.getMessageBodyAsRawString();
|
|
|
- // 对messageBody进行base64解码
|
|
|
- var messageBodyStr = new String(decoder.decode(base64), CharsetUtil.CHARSET_UTF_8);
|
|
|
-
|
|
|
- // 对messageBodyStr中的payload进行base64解码后转换成MessageBody对象
|
|
|
- var json = JSONObject.parseObject(messageBodyStr);
|
|
|
-
|
|
|
- json.put("payload", JSONObject.parseObject(new String(decoder.decode(json.get("payload").toString()), CharsetUtil.CHARSET_UTF_8)));
|
|
|
- log.info("消息内容:{}", json.toJSONString());
|
|
|
- if ("upload".equals(json.getString("messagetype"))) {
|
|
|
- var event = json.getJSONObject("payload").getString("event");
|
|
|
- var eventHandler = AwoaraEventHandlerFactory.getEventHandler(event);
|
|
|
- var message = parseMessageBody(json.toJSONString(), Event.getClazz(event));
|
|
|
- eventHandler.handle(message);
|
|
|
- } else {
|
|
|
- log.info("message type is not upload");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static <T> MessageBody<T> parseMessageBody(String jsonStr, Class<T> clazz) {
|
|
|
- // 去掉json字符串中的转义符
|
|
|
- var json = StringEscapeUtils.unescapeJava(jsonStr);
|
|
|
- return JSONObject.parseObject(json, new TypeReference<>(clazz) {
|
|
|
-
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @EventListener(classes = {ContextRefreshedEvent.class})
|
|
|
- @Async
|
|
|
- public void init() {
|
|
|
- batchReceive(queue);
|
|
|
- }
|
|
|
-
|
|
|
- @PreDestroy
|
|
|
- public void shutdown() {
|
|
|
- client.close(); // 关闭MNS客户端
|
|
|
- executorService.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-}
|