Pārlūkot izejas kodu

fix: AMQP 消费改为 CLIENT_ACKNOWLEDGE 模式,确保消息处理成功后才确认

- Session 从 AUTO_ACKNOWLEDGE 改为 CLIENT_ACKNOWLEDGE
- onMessage 中在 processMessage 成功后显式调用 message.acknowledge()
- 异常处理从 processMessage 内部移至 submit 外层,处理失败时不确认消息

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
skyline 1 dienu atpakaļ
vecāks
revīzija
38dfeab8b1

+ 39 - 36
car-wash-service/src/main/java/com/kym/service/aliyun/lot/AmqpConsumer.java

@@ -50,7 +50,14 @@ public class AmqpConsumer {
         @Override
         public void onMessage(final Message message) {
             try {
-                executorService.submit(() -> processMessage(message));
+                executorService.submit(() -> {
+                    try {
+                        processMessage(message);
+                        message.acknowledge();
+                    } catch (Exception e) {
+                        log.error("processMessage occurs error ", e);
+                    }
+                });
             } catch (Exception e) {
                 log.error("submit task occurs exception ", e);
             }
@@ -108,41 +115,37 @@ public class AmqpConsumer {
     private String host;
     private int connectionCount = 4;
 
-    private static void processMessage(Message message) {
-        try {
-            var jsonObject = new JSONObject();
-            byte[] body = message.getBody(byte[].class);
-            String content = new String(body);
-            String topic = message.getStringProperty("topic");
-            String messageId = message.getStringProperty("messageId");
-            long generateTime = message.getLongProperty("generateTime");
-            log.info("消息内容:{}", content);
-            var payload = JSONObject.parseObject(content);
-
-            // 阿里云IoT设备状态变化通知(上下线)topic: /as/mqtt/status/{productKey}/{deviceName}
-            // 原始payload没有event字段,需转换为Payload<T>格式以复用现有路由
-            if (topic != null && topic.contains("/as/mqtt/status/")) {
-                var statusPayload = new JSONObject();
-                statusPayload.put("version", "1.0");
-                statusPayload.put("event", "device_status");
-                statusPayload.put("data", payload);
-                content = statusPayload.toJSONString();
-                payload = statusPayload;
-            }
-
-            jsonObject.put("payload", payload);
-            jsonObject.put("messagetype", "upload");
-            jsonObject.put("topic", topic);
-            jsonObject.put("messageid", messageId);
-            jsonObject.put("timestamp", generateTime);
-
-            var event = payload.getString("event");
-            var eventHandler = AwoaraEventHandlerFactory.getEventHandler(event);
-            var msg = parseMessageBody(jsonObject.toJSONString(), Event.getClazz(event));
-            eventHandler.handle(msg);
-        } catch (Exception e) {
-            log.error("processMessage occurs error ", e);
+    private static void processMessage(Message message) throws Exception {
+        var jsonObject = new JSONObject();
+        byte[] body = message.getBody(byte[].class);
+        String content = new String(body);
+        String topic = message.getStringProperty("topic");
+        String messageId = message.getStringProperty("messageId");
+        long generateTime = message.getLongProperty("generateTime");
+        log.info("消息内容:{}", content);
+        var payload = JSONObject.parseObject(content);
+
+        // 阿里云IoT设备状态变化通知(上下线)topic: /as/mqtt/status/{productKey}/{deviceName}
+        // 原始payload没有event字段,需转换为Payload<T>格式以复用现有路由
+        if (topic != null && topic.contains("/as/mqtt/status/")) {
+            var statusPayload = new JSONObject();
+            statusPayload.put("version", "1.0");
+            statusPayload.put("event", "device_status");
+            statusPayload.put("data", payload);
+            content = statusPayload.toJSONString();
+            payload = statusPayload;
         }
+
+        jsonObject.put("payload", payload);
+        jsonObject.put("messagetype", "upload");
+        jsonObject.put("topic", topic);
+        jsonObject.put("messageid", messageId);
+        jsonObject.put("timestamp", generateTime);
+
+        var event = payload.getString("event");
+        var eventHandler = AwoaraEventHandlerFactory.getEventHandler(event);
+        var msg = parseMessageBody(jsonObject.toJSONString(), Event.getClazz(event));
+        eventHandler.handle(msg);
     }
 
     private static <T> MessageBody<T> parseMessageBody(String jsonStr, Class<T> clazz) {
@@ -188,7 +191,7 @@ public class AmqpConsumer {
             connections.add(connection);
 
             ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
             connection.start();
 
             MessageConsumer consumer = session.createConsumer(queue);