|
@@ -2,14 +2,16 @@ package com.kym.miniapp.mq;
|
|
|
|
|
|
|
|
import cn.hutool.core.util.CharsetUtil;
|
|
import cn.hutool.core.util.CharsetUtil;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
|
|
+import com.alibaba.fastjson2.TypeReference;
|
|
|
import com.aliyun.mns.client.CloudAccount;
|
|
import com.aliyun.mns.client.CloudAccount;
|
|
|
import com.aliyun.mns.client.CloudQueue;
|
|
import com.aliyun.mns.client.CloudQueue;
|
|
|
import com.aliyun.mns.client.MNSClient;
|
|
import com.aliyun.mns.client.MNSClient;
|
|
|
import com.aliyun.mns.model.Message;
|
|
import com.aliyun.mns.model.Message;
|
|
|
import com.kym.service.awoara.factory.AwoaraEventHandlerFactory;
|
|
import com.kym.service.awoara.factory.AwoaraEventHandlerFactory;
|
|
|
-import com.kym.service.awoara.handle.MessageBody;
|
|
|
|
|
-import com.kym.service.awoara.handle.Report;
|
|
|
|
|
|
|
+import com.kym.service.awoara.entity.event.Event;
|
|
|
|
|
+import com.kym.service.awoara.entity.event.MessageBody;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.apache.commons.text.StringEscapeUtils;
|
|
|
import org.springframework.context.event.ContextRefreshedEvent;
|
|
import org.springframework.context.event.ContextRefreshedEvent;
|
|
|
import org.springframework.context.event.EventListener;
|
|
import org.springframework.context.event.EventListener;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
import org.springframework.scheduling.annotation.Async;
|
|
@@ -60,25 +62,25 @@ public class MnsHandler {
|
|
|
*/
|
|
*/
|
|
|
@SuppressWarnings("InfiniteLoopStatement")
|
|
@SuppressWarnings("InfiniteLoopStatement")
|
|
|
private static void longPollingBatchReceive(CloudQueue queue) {
|
|
private static void longPollingBatchReceive(CloudQueue queue) {
|
|
|
- executorService.execute(() -> {
|
|
|
|
|
- while (true){
|
|
|
|
|
- log.info("=============start longPollingBatchReceive=============");
|
|
|
|
|
-
|
|
|
|
|
- // 一次性拉取最多xx条消息
|
|
|
|
|
- int batchSize = 15;
|
|
|
|
|
- // 长轮询时间为 xx s
|
|
|
|
|
- int waitSeconds = 15;
|
|
|
|
|
-
|
|
|
|
|
- List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
|
|
|
|
|
- if (messages != null && !messages.isEmpty()) {
|
|
|
|
|
- for (Message message : messages) {
|
|
|
|
|
- printMsgAndDelete(queue, message);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ executorService.execute(() -> {
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ log.info("=============start longPollingBatchReceive=============");
|
|
|
|
|
+
|
|
|
|
|
+ // 一次性拉取最多xx条消息
|
|
|
|
|
+ int batchSize = 15;
|
|
|
|
|
+ // 长轮询时间为 xx s
|
|
|
|
|
+ int waitSeconds = 15;
|
|
|
|
|
+
|
|
|
|
|
+ List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
|
|
|
|
|
+ if (messages != null && !messages.isEmpty()) {
|
|
|
|
|
+ for (Message message : messages) {
|
|
|
|
|
+ printMsgAndDelete(queue, message);
|
|
|
}
|
|
}
|
|
|
- log.info("=============end longPollingBatchReceive=============");
|
|
|
|
|
}
|
|
}
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ log.info("=============end longPollingBatchReceive=============");
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 获取单条消息
|
|
* 获取单条消息
|
|
@@ -116,22 +118,43 @@ public class MnsHandler {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- static void handleMessage(String messageBody) {
|
|
|
|
|
- // todo 对messageBody进行base64解码
|
|
|
|
|
- var message = new String(decoder.decode(messageBody), CharsetUtil.CHARSET_UTF_8);
|
|
|
|
|
- log.info("message: {}", message);
|
|
|
|
|
- // 根据event类型组装成对应对象
|
|
|
|
|
- var body = JSONObject.parseObject(message, MessageBody.class);
|
|
|
|
|
- var report = JSONObject.parseObject(new String(decoder.decode(body.getPayload()), CharsetUtil.CHARSET_UTF_8), Report.class);
|
|
|
|
|
- var handler = AwoaraEventHandlerFactory.getEventHandler(report.getEvent());
|
|
|
|
|
- handler.handle(report);
|
|
|
|
|
|
|
+ static void handleMessage(String base64) {
|
|
|
|
|
+ // 对messageBody进行base64解码
|
|
|
|
|
+ var messageBodyStr = new String(decoder.decode(base64), CharsetUtil.CHARSET_UTF_8);
|
|
|
|
|
+
|
|
|
|
|
+ // 对messageBodyStr中的payload进行base64解码后转换成MessageBody对象
|
|
|
|
|
+ var json = JSONObject.parseObject(messageBodyStr);
|
|
|
|
|
+
|
|
|
|
|
+ json.put("payload", JSONObject.parseObject(new String(decoder.decode(json.get("payload").toString()), CharsetUtil.CHARSET_UTF_8)));
|
|
|
|
|
+ log.info("消息内容:{}", json.toJSONString());
|
|
|
|
|
+ if ("upload".equals(json.getString("messagetype"))) {
|
|
|
|
|
+ var event = json.getJSONObject("payload").getString("event");
|
|
|
|
|
+ if ("device_state".equals(event)) {
|
|
|
|
|
+ var handler = AwoaraEventHandlerFactory.getEventHandler(event);
|
|
|
|
|
+ var message = parseMessageBody(json.toJSONString(), Event.getClazz(event));
|
|
|
|
|
+ handler.handle(message);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.info("message type is not upload");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static <T> MessageBody<T> parseMessageBody(String jsonStr, Class<T> clazz) {
|
|
|
|
|
+ // 去掉json字符串中的转义符
|
|
|
|
|
+ var json = StringEscapeUtils.unescapeJava(jsonStr);
|
|
|
|
|
+ // 获取到泛型类型
|
|
|
|
|
|
|
|
|
|
+ return JSONObject.parseObject(json, new TypeReference<>(clazz) {
|
|
|
|
|
+
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
@EventListener(classes = {ContextRefreshedEvent.class})
|
|
@EventListener(classes = {ContextRefreshedEvent.class})
|
|
|
@Async
|
|
@Async
|
|
|
- public void init() throws InterruptedException {
|
|
|
|
|
- // 开启线程处理队列消息
|
|
|
|
|
|
|
+ public void init() {
|
|
|
longPollingBatchReceive(queue);
|
|
longPollingBatchReceive(queue);
|
|
|
}
|
|
}
|
|
|
|
|
|