浏览代码

新增设备上下线状态变更事件处理

阿里云IoT设备状态变化通知之前已订阅但未处理,消息被静默丢弃。
现在通过 topic /as/mqtt/status/ 识别上下线消息,离线时写入 Redis
OFFLINE 标记和 MonitorLog 记录,上线时清除标记并恢复日志。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
skyline 5 天之前
父节点
当前提交
faff3edf5a

+ 29 - 0
car-wash-entity/src/main/java/com/kym/entity/awoara/DeviceStatusObject.java

@@ -0,0 +1,29 @@
+package com.kym.entity.awoara;
+
+import lombok.Data;
+
+/**
+ * 阿里云IoT设备状态变化通知(上下线)
+ *
+ * @author skyline
+ */
+@Data
+public class DeviceStatusObject {
+
+    /**
+     * online / offline
+     */
+    private String status;
+
+    private String productKey;
+
+    private String deviceName;
+
+    private String time;
+
+    private String utcTime;
+
+    private String lastTime;
+
+    private String clientIp;
+}

+ 2 - 1
car-wash-entity/src/main/java/com/kym/entity/awoara/Event.java

@@ -20,7 +20,8 @@ public enum Event {
     order_update("order_update", OrderInfoObject.class),  // 订单状态更新事件
     order_close("order_close", OrderInfoObject.class),  // 订单关闭事件
     user_login("user_login", UserLoginObject.class),  // ⽤户登录事件
-    card_event("card_event", CardEventObject.class); // ⽤户刷卡事件
+    card_event("card_event", CardEventObject.class), // ⽤户刷卡事件
+    device_status("device_status", DeviceStatusObject.class); // 设备上下线状态变更
 
 
     public String eventName;

+ 12 - 0
car-wash-service/src/main/java/com/kym/service/aliyun/lot/AmqpConsumer.java

@@ -118,6 +118,18 @@ public class AmqpConsumer {
             long generateTime = message.getLongProperty("generateTime");
             log.info("消息内容:{}", content);
             var payload = JSONObject.parseObject(content);
+
+            // 阿里云IoT设备状态变化通知(上下线)topic: /as/mqtt/status/{productKey}/{deviceName}
+            // 原始payload没有event字段,需转换为Payload<T>格式以复用现有路由
+            if (topic != null && topic.contains("/as/mqtt/status/")) {
+                var statusPayload = new JSONObject();
+                statusPayload.put("version", "1.0");
+                statusPayload.put("event", "device_status");
+                statusPayload.put("data", payload);
+                content = statusPayload.toJSONString();
+                payload = statusPayload;
+            }
+
             jsonObject.put("payload", payload);
             jsonObject.put("messagetype", "upload");
             jsonObject.put("topic", topic);

+ 106 - 0
car-wash-service/src/main/java/com/kym/service/awoara/event/handle/DeviceStatusEventHandler.java

@@ -0,0 +1,106 @@
+package com.kym.service.awoara.event.handle;
+
+import com.kym.entity.MonitorLog;
+import com.kym.entity.WashDevice;
+import com.kym.entity.awoara.DeviceStatusObject;
+import com.kym.entity.awoara.MessageBody;
+import com.kym.entity.common.RedisKeys;
+import com.kym.service.MonitorLogService;
+import com.kym.service.WashDeviceService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+
+/**
+ * 设备上下线状态变更处理
+ *
+ * @author skyline
+ */
+@Slf4j
+public class DeviceStatusEventHandler implements AwoaraEventHandler<DeviceStatusObject> {
+
+    private final WashDeviceService washDeviceService;
+    private final MonitorLogService monitorLogService;
+    private final StringRedisTemplate stringRedisTemplate;
+
+    public DeviceStatusEventHandler(WashDeviceService washDeviceService,
+                                    MonitorLogService monitorLogService,
+                                    StringRedisTemplate stringRedisTemplate) {
+        this.washDeviceService = washDeviceService;
+        this.monitorLogService = monitorLogService;
+        this.stringRedisTemplate = stringRedisTemplate;
+    }
+
+    @Override
+    @Transactional
+    public void handle(MessageBody<DeviceStatusObject> message) {
+        log.info("DeviceStatusEventHandler: {}", message);
+
+        var statusData = message.getPayload().getData();
+        var status = statusData.getStatus();
+        var productKey = statusData.getProductKey();
+        var deviceName = statusData.getDeviceName();
+
+        var device = washDeviceService.lambdaQuery()
+                .eq(WashDevice::getProductKey, productKey)
+                .eq(WashDevice::getDeviceName, deviceName)
+                .one();
+
+        if (device == null) {
+            log.warn("设备状态变更:未找到设备 productKey={}, deviceName={}", productKey, deviceName);
+            return;
+        }
+
+        var deviceId = device.getId().toString();
+        var offlineKey = RedisKeys.OFFLINE + deviceId;
+
+        if ("offline".equals(status)) {
+            handleOffline(device, deviceName, offlineKey);
+        } else if ("online".equals(status)) {
+            handleOnline(device, deviceName, offlineKey);
+        }
+    }
+
+    private void handleOffline(WashDevice device, String deviceName, String offlineKey) {
+        log.info("设备离线: deviceId={}, deviceName={}", device.getId(), deviceName);
+
+        stringRedisTemplate.opsForValue().set(offlineKey,
+                String.valueOf(System.currentTimeMillis()),
+                Duration.ofHours(24));
+
+        MonitorLog logEntry = new MonitorLog()
+                .setStationId(device.getStationId())
+                .setType(2)
+                .setSn(deviceName)
+                .setOfflineTime(LocalDateTime.now())
+                .setOfflineStatus(0)
+                .setIsNotice(MonitorLog.IS_RECOVER_未恢复)
+                .setIsRecover(MonitorLog.IS_RECOVER_未恢复);
+        monitorLogService.save(logEntry);
+    }
+
+    private void handleOnline(WashDevice device, String deviceName, String offlineKey) {
+        log.info("设备上线: deviceId={}, deviceName={}", device.getId(), deviceName);
+
+        stringRedisTemplate.delete(offlineKey);
+
+        var unrecoveredLog = monitorLogService.lambdaQuery()
+                .eq(MonitorLog::getSn, deviceName)
+                .eq(MonitorLog::getIsRecover, MonitorLog.IS_RECOVER_未恢复)
+                .orderByDesc(MonitorLog::getOfflineTime)
+                .last("limit 1")
+                .one();
+
+        if (unrecoveredLog != null) {
+            monitorLogService.lambdaUpdate()
+                    .set(MonitorLog::getIsRecover, MonitorLog.IS_RECOVER_已恢复)
+                    .set(MonitorLog::getRecoverTime, LocalDateTime.now())
+                    .eq(MonitorLog::getId, unrecoveredLog.getId())
+                    .update();
+        }
+    }
+}

+ 9 - 0
car-wash-service/src/main/java/com/kym/service/awoara/factory/AwoaraEventHandlerFactory.java

@@ -2,11 +2,13 @@ package com.kym.service.awoara.factory;
 
 import com.kym.common.exception.BusinessException;
 import com.kym.entity.awoara.Event;
+import com.kym.service.MonitorLogService;
 import com.kym.service.OrderSettlementService;
 import com.kym.service.WashDeviceService;
 import com.kym.service.WashOrderService;
 import com.kym.service.awoara.event.handle.*;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.stereotype.Component;
 
 /**
@@ -22,15 +24,21 @@ public class AwoaraEventHandlerFactory {
     private static WashDeviceService washDeviceService;
     private static WashOrderService washOrderService;
     private static OrderSettlementService orderSettlementService;
+    private static MonitorLogService monitorLogService;
+    private static StringRedisTemplate stringRedisTemplate;
 
 
     public AwoaraEventHandlerFactory(WashDeviceService washDeviceService, WashOrderService washOrderService,
                                      OrderSettlementService orderSettlementService,
+                                     MonitorLogService monitorLogService,
+                                     StringRedisTemplate stringRedisTemplate,
     @Value("${kym.domain}") String domain) {
         DOMAIN = domain;
         AwoaraEventHandlerFactory.washDeviceService = washDeviceService;
         AwoaraEventHandlerFactory.washOrderService = washOrderService;
         AwoaraEventHandlerFactory.orderSettlementService = orderSettlementService;
+        AwoaraEventHandlerFactory.monitorLogService = monitorLogService;
+        AwoaraEventHandlerFactory.stringRedisTemplate = stringRedisTemplate;
     }
 
     public static AwoaraEventHandler getEventHandler(String eventName) {
@@ -45,6 +53,7 @@ public class AwoaraEventHandlerFactory {
                         new OrderCloseEventHandler(washOrderService, orderSettlementService);
                 case user_login -> new UserLoginEventHandler();
                 case card_event -> new CardEventHandler();
+                case device_status -> new DeviceStatusEventHandler(washDeviceService, monitorLogService, stringRedisTemplate);
             };
         } catch (IllegalArgumentException e) {
             throw new BusinessException("无效的事件名称: " + eventName);