skyline 1 год назад
Родитель
Сommit
8a027130e7
1 измененных файлов с 21 добавлено и 18 удалено
  1. 21 18
      car-wash-service/src/main/java/com/kym/service/mq/MnsHandler.java

+ 21 - 18
car-wash-service/src/main/java/com/kym/service/mq/MnsHandler.java

@@ -6,12 +6,11 @@ import com.alibaba.fastjson2.TypeReference;
 import com.aliyun.mns.client.CloudAccount;
 import com.aliyun.mns.client.CloudQueue;
 import com.aliyun.mns.client.MNSClient;
-import com.aliyun.mns.common.ClientException;
-import com.aliyun.mns.common.ServiceException;
 import com.aliyun.mns.model.Message;
 import com.kym.entity.awoara.Event;
 import com.kym.entity.awoara.MessageBody;
 import com.kym.service.awoara.factory.AwoaraEventHandlerFactory;
+import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.text.StringEscapeUtils;
 import org.springframework.context.event.ContextRefreshedEvent;
@@ -38,9 +37,16 @@ public class MnsHandler {
     /**
      * 线程池
      */
+//    private final static ExecutorService executorService = new ThreadPoolExecutor(
+//            2, 4, 60, TimeUnit.SECONDS,
+//            new LinkedBlockingQueue<>(10000));
+
+    // 增加饱和策略(建议用CallerRunsPolicy)
     private final static ExecutorService executorService = new ThreadPoolExecutor(
-            2, 4, 60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(10000));
+            4, 8, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(10000),
+            new ThreadPoolExecutor.CallerRunsPolicy()); // 关键修改点
+
 
     /**
      * 是否做 base64 编码
@@ -60,23 +66,13 @@ public class MnsHandler {
 
     private static void batchReceive(CloudQueue queue) {
         executorService.execute(() -> {
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 try {
                     longPollingBatchReceive(queue);
-                } catch (ClientException ce) {
-                    System.out.println("Something wrong with the network connection between client and MNS service."
-                            + "Please check your network and DNS availablity.");
-                    ce.printStackTrace();
-                } catch (ServiceException se) {
-                    if (se.getErrorCode().equals("QueueNotExist")) {
-                        System.out.println("Queue is not exist.Please create queue before use");
-                    } else if (se.getErrorCode().equals("TimeExpired")) {
-                        System.out.println("The request is time expired. Please check your local machine timeclock");
-                    }
-                    se.printStackTrace();
                 } catch (Exception e) {
-                    System.out.println("Unknown exception happened!");
-                    e.printStackTrace();
+                    log.error("消息处理异常", e);
+                    // 增加恢复逻辑(如休眠后重试)
+                    try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException ex) { break; }
                 }
             }
         });
@@ -175,4 +171,11 @@ public class MnsHandler {
         batchReceive(queue);
     }
 
+    @PreDestroy
+    public void shutdown() {
+        client.close(); // 关闭MNS客户端
+        executorService.shutdown();
+    }
+
+
 }