Sfoglia il codice sorgente

消息处理优化

skyline 10 mesi fa
parent
commit
bf4b01f131

+ 2 - 0
car-wash-service/src/main/java/com/kym/service/awoara/event/handle/BootEventHandler.java

@@ -8,6 +8,7 @@ import com.kym.service.WashDeviceService;
 import com.kym.service.WashOrderService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 /**
  * 设备启动事件处理
@@ -28,6 +29,7 @@ public class BootEventHandler implements AwoaraEventHandler<DeviceStateObject> {
     }
 
     @Override
+    @Transactional
     public void handle(MessageBody<DeviceStateObject> message) {
         log.info(message.toString());
         log.info("BootEventHandler");

+ 3 - 1
car-wash-service/src/main/java/com/kym/service/awoara/event/handle/DeviceStateEventHandler.java

@@ -1,11 +1,12 @@
 package com.kym.service.awoara.event.handle;
 
-import com.kym.entity.awoara.MessageBody;
 import com.kym.entity.WashDevice;
 import com.kym.entity.awoara.DeviceStateObject;
+import com.kym.entity.awoara.MessageBody;
 import com.kym.service.WashDeviceService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 /**
  * 设备状态更新事件事件处理
@@ -23,6 +24,7 @@ public class DeviceStateEventHandler implements AwoaraEventHandler<DeviceStateOb
     }
 
     @Override
+    @Transactional
     public void handle(MessageBody<DeviceStateObject> message) {
         log.info(message.toString());
         log.info("DeviceStateEventHandler");

+ 6 - 3
car-wash-service/src/main/java/com/kym/service/awoara/event/handle/OrderCloseEventHandler.java

@@ -48,7 +48,7 @@ public class OrderCloseEventHandler implements AwoaraEventHandler<OrderInfoObjec
 
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
+    @Transactional
     public void handle(MessageBody<OrderInfoObject> message) {
         log.info("收到订单关闭事件");
 
@@ -152,6 +152,7 @@ public class OrderCloseEventHandler implements AwoaraEventHandler<OrderInfoObjec
      * @param washOrder
      * @param account
      */
+    @Transactional
     protected void deductions(WashOrder washOrder, Account account) {
         // 账户扣费
         account.setBalance(account.getBalance() - washOrder.getAmountReceived());
@@ -175,7 +176,8 @@ public class OrderCloseEventHandler implements AwoaraEventHandler<OrderInfoObjec
      *
      * @param washOrder
      */
-    private void doLocalSplit(WashOrder washOrder) {
+    @Transactional
+    protected void doLocalSplit(WashOrder washOrder) {
         log.info("订单:{},执行(本店)分账", washOrder.getOrderId());
         int amount = washOrder.getAmount();
         BigDecimal platformRate = BigDecimal.valueOf(0.1);
@@ -265,7 +267,8 @@ public class OrderCloseEventHandler implements AwoaraEventHandler<OrderInfoObjec
      * @param washOrder
      * @param userStationId 用户归属的站点Id
      */
-    private void doCrossSplit(WashOrder washOrder, String userStationId) {
+    @Transactional
+    protected void doCrossSplit(WashOrder washOrder, String userStationId) {
         log.info("订单:{},执行(跨店)分账", washOrder.getOrderId());
         int amount = washOrder.getAmount();
         BigDecimal platformRate = BigDecimal.valueOf(0.1);

+ 2 - 0
car-wash-service/src/main/java/com/kym/service/awoara/event/handle/OrderCreateEventHandler.java

@@ -7,6 +7,7 @@ import com.kym.entity.awoara.OrderInfoObject;
 import com.kym.service.WashOrderService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 /**
  * 收到订单事件
@@ -27,6 +28,7 @@ public class OrderCreateEventHandler implements AwoaraEventHandler<OrderInfoObje
     }
 
     @Override
+    @Transactional
     public void handle(MessageBody<OrderInfoObject> message) {
         log.info(message.toString());
         log.info("OrderCreateEventHandler");

+ 2 - 0
car-wash-service/src/main/java/com/kym/service/awoara/event/handle/OrderUpdateEventHandler.java

@@ -7,6 +7,7 @@ import com.kym.entity.WashOrder;
 import com.kym.service.WashOrderService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 /**
  * 订单状态更新事件
@@ -33,6 +34,7 @@ public class OrderUpdateEventHandler implements AwoaraEventHandler<OrderInfoObje
     }
 
     @Override
+    @Transactional
     public void handle(MessageBody<OrderInfoObject> message) {
         log.info(message.toString());
         log.info("收到订单更新事件");

+ 20 - 13
car-wash-service/src/main/java/com/kym/service/impl/MpMsgTemplateServiceImpl.java

@@ -72,19 +72,26 @@ public class MpMsgTemplateServiceImpl extends ServiceImpl<MpMsgTemplateMapper, M
     @Override
     @Async
     public void sendTemplateMessage(MsgTemplateType templateType, Long userId, Object... args) {
-        var mpMsgTemplate = lambdaQuery().eq(MpMsgTemplate::getBizType, templateType.getBizType()).one();
-        if (mpMsgTemplate == null) return;
+        try {
+            var mpMsgTemplate = lambdaQuery().eq(MpMsgTemplate::getBizType, templateType.getBizType()).one();
+            if (mpMsgTemplate == null) {
+                return;
+            }
+
+            mpRelationService.lambdaQuery().eq(MpRelation::getUserId, userId).oneOpt().ifPresent(mpRelation -> {
+                TemplateParamBuilder builder = paramBuilders.get(templateType);
+                Map<String, String> params = builder.buildParams(args);
+                WxPbUtil.sendPublicTemplateMessage(
+                        mpRelation.getMpOpenid(),
+                        mpMsgTemplate.getTemplateId(),
+                        params,
+                        templateType.equals(MsgTemplateType.PARKING_COUPON) ? args[1].toString() : "",
+                        ""
+                );
+            });
+        } catch (Exception e) {
+            log.error("发送模板消息失败", e);
+        }
 
-        mpRelationService.lambdaQuery().eq(MpRelation::getUserId, userId).oneOpt().ifPresent(mpRelation -> {
-            TemplateParamBuilder builder = paramBuilders.get(templateType);
-            Map<String, String> params = builder.buildParams(args);
-            WxPbUtil.sendPublicTemplateMessage(
-                    mpRelation.getMpOpenid(),
-                    mpMsgTemplate.getTemplateId(),
-                    params,
-                    templateType.equals(MsgTemplateType.PARKING_COUPON) ? args[1].toString() : "",
-                    ""
-            );
-        });
     }
 }

+ 19 - 14
car-wash-service/src/main/java/com/kym/service/mq/MnsHandler.java

@@ -20,7 +20,6 @@ import org.springframework.stereotype.Component;
 
 import java.util.Base64;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -72,10 +71,14 @@ public class MnsHandler {
                 } catch (Exception e) {
                     log.error("消息处理异常", e);
                     // 增加恢复逻辑(如休眠后重试)
-                    try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException ex) { break; }
-                }finally {
+                    try {
+                        TimeUnit.SECONDS.sleep(5);
+                    } catch (InterruptedException ex) {
+                        break;
+                    }
+                } finally {
                     //打印线程池状态
-                    log.info("Pool Size:{} ,Active Thread Count:{},Task Queue Size:{},Completed Task Count: {}" ,executorService.getPoolSize(),executorService.getActiveCount(),executorService.getQueue().size(), executorService.getCompletedTaskCount());
+                    log.info("Pool Size:{} ,Active Thread Count:{},Task Queue Size:{},Completed Task Count: {}", executorService.getPoolSize(), executorService.getActiveCount(), executorService.getQueue().size(), executorService.getCompletedTaskCount());
                 }
             }
         });
@@ -129,17 +132,20 @@ public class MnsHandler {
             log.info("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
             log.info("message id: " + popMsg.getMessageId());
             log.info("message dequeue count:" + popMsg.getDequeueCount());
-            //<<to add your special logic.>>
 
-            handleMessage(popMsg.getMessageBodyAsRawString());
-
-            //remember to  delete message when consume message successfully.
-            queue.deleteMessage(popMsg.getReceiptHandle());
-            log.info("delete message successfully.\n");
+            try {
+                handleMessage(queue, popMsg);
+                queue.deleteMessage(popMsg.getReceiptHandle());
+            } catch (Exception e) {
+                log.error("Failed to process message: " + popMsg.getMessageId(), e);
+            }
+            log.info("delete message successfully.");
         }
     }
 
-    static void handleMessage(String base64) {
+
+    static void handleMessage(CloudQueue queue, Message popMsg) {
+        var base64 = popMsg.getMessageBodyAsRawString();
         // 对messageBody进行base64解码
         var messageBodyStr = new String(decoder.decode(base64), CharsetUtil.CHARSET_UTF_8);
 
@@ -150,13 +156,12 @@ public class MnsHandler {
         log.info("消息内容:{}", json.toJSONString());
         if ("upload".equals(json.getString("messagetype"))) {
             var event = json.getJSONObject("payload").getString("event");
-            var handler = AwoaraEventHandlerFactory.getEventHandler(event);
+            var eventHandler = AwoaraEventHandlerFactory.getEventHandler(event);
             var message = parseMessageBody(json.toJSONString(), Event.getClazz(event));
-            handler.handle(message);
+            eventHandler.handle(message);
         } else {
             log.info("message type is not upload");
         }
-
     }
 
     private static <T> MessageBody<T> parseMessageBody(String jsonStr, Class<T> clazz) {