skyline 4 месяцев назад
Родитель
Сommit
d1ce018283
1 измененных файлов с 42 добавлено и 47 удалено
  1. 42 47
      miniapp/src/main/java/com/kym/miniapp/jobs/StartChargeDelayJob.java

+ 42 - 47
miniapp/src/main/java/com/kym/miniapp/jobs/StartChargeDelayJob.java

@@ -41,10 +41,12 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
     private final static DelayQueue<DelayedItem<DelayChargeOrder>> START_DELAY_QUEUE = new DelayQueue<>();
     private final static DelayQueue<DelayedItem<DelayChargeOrder>> START_DELAY_QUEUE = new DelayQueue<>();
     private final ChargeOrderService chargeOrderService;
     private final ChargeOrderService chargeOrderService;
     private final ChargeService chargeService;
     private final ChargeService chargeService;
+
+    private final static int MAX_THREAD_COUNT = 5;
     /**
     /**
      * 线程池
      * 线程池
      */
      */
-    private final ExecutorService executor = Executors.newFixedThreadPool(1);
+    private final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD_COUNT);
 
 
     /**
     /**
      * 重试列表
      * 重试列表
@@ -82,60 +84,53 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
     }
     }
 
 
     private void processDelayedOrders() {
     private void processDelayedOrders() {
-        while (true) {
+        // 启动线程池中的所有线程处理延迟队列
+        for (int i = 0; i < MAX_THREAD_COUNT; i++) {
             executor.execute(() -> {
             executor.execute(() -> {
-                ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> null); // 初始化为空值,避免使用new ThreadLocal()
+                ThreadLocal<String> threadLocal = new ThreadLocal<>();
                 log.info("预约启动充电处理线程:{}", Thread.currentThread().getName());
                 log.info("预约启动充电处理线程:{}", Thread.currentThread().getName());
-                DelayedItem<DelayChargeOrder> delayedItem = null;
-                try {
-                    // 线程休眠300ms
-                    Thread.sleep(300);
-                    delayedItem = START_DELAY_QUEUE.take();
-                    log.info("出队预约充电订单:{},队列剩余:{}", delayedItem.data.getStartChargeSeq(), START_DELAY_QUEUE.size());
-                    // 启动充电
-                    var order = delayedItem.data;
-                    threadLocal.set(order.getStartChargeSeq());
-                    chargeService.queryStartCharge(order.getUserId(), order.getConnectorId(), null, null,false, null, null);
-                    log.info("预约充电启动成功:用户:{},订单号:{},预约启动时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getStartTime());
-                } catch (Exception e) {
-                    if (e instanceof InterruptedException) {
-                        log.error("预约充电队列take异常", e);
-                    } else {
-                        log.info("预约启动充电失败,订单号:{}", threadLocal.get());
-                        if (e instanceof BusinessException && (ResponseEnum.PLATFORM_QUERY_TOKEN_EXCEPTION.getCode().equals(((BusinessException) e).getCode()))) {
-                            if (retryList.contains(threadLocal.get())) {
-                                log.info("PlatformToken异常,预约订单:{}已重试忽略", threadLocal.get());
-                                log.error(e.getMessage());
+                while (true) {
+                    DelayedItem<DelayChargeOrder> delayedItem = null;
+                    try {
+                        // 阻塞等待延迟队列中的任务
+                        delayedItem = START_DELAY_QUEUE.take();
+                        log.info("出队预约充电订单:{},队列剩余:{}", delayedItem.data.getStartChargeSeq(), START_DELAY_QUEUE.size());
+                        // 启动充电
+                        var order = delayedItem.data;
+                        threadLocal.set(order.getStartChargeSeq());
+                        chargeService.queryStartCharge(order.getUserId(), order.getConnectorId(), null, null, false, null, null);
+                        log.info("预约充电启动成功:用户:{},订单号:{},预约启动时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getStartTime());
+                    } catch (Exception e) {
+                        if (e instanceof InterruptedException) {
+                            log.error("预约充电队列take异常", e);
+                            // 线程被中断,退出循环
+                            break;
+                        } else {
+                            log.error("预约启动充电失败,订单号:{},异常信息:{}", threadLocal.get(), e.getMessage(), e);
+                            if (e instanceof BusinessException && (ResponseEnum.PLATFORM_QUERY_TOKEN_EXCEPTION.getCode().equals(((BusinessException) e).getCode()))) {
+                                if (retryList.contains(threadLocal.get())) {
+                                    log.info("PlatformToken异常,预约订单:{}已重试忽略", threadLocal.get());
+                                    // 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
+                                    updateOrderStatus(threadLocal.get(), ChargeOrder.CHARGE_STATUS_已结束, ChargeOrder.ORDER_STATUS_失败, ChargeOrder.STOP_REASON_预约启动充电失败);
+                                    retryList.remove(threadLocal.get());
+                                } else {
+                                    log.info("PlatformToken异常,预约订单:{},重试", threadLocal.get());
+                                    // token异常就重新放入队列重试
+                                    var success = addToDelayQueue(delayedItem);
+                                    if (success) {
+                                        retryList.add(threadLocal.get());
+                                    }
+                                }
+                            } else {
                                 // 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
                                 // 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
                                 updateOrderStatus(threadLocal.get(), ChargeOrder.CHARGE_STATUS_已结束, ChargeOrder.ORDER_STATUS_失败, ChargeOrder.STOP_REASON_预约启动充电失败);
                                 updateOrderStatus(threadLocal.get(), ChargeOrder.CHARGE_STATUS_已结束, ChargeOrder.ORDER_STATUS_失败, ChargeOrder.STOP_REASON_预约启动充电失败);
-                                retryList.remove(threadLocal.get());
-                                return;
-                            }
-                            log.info("PlatformToken异常,预约订单:{},重试", threadLocal.get());
-                            // token异常就重新放入队列重试
-                            var success = addToDelayQueue(delayedItem);
-                            if (success) {
-                                retryList.add(threadLocal.get());
                             }
                             }
-                            // 跳出本次循环
-                            return;
                         }
                         }
-                        log.error(e.getMessage());
-                        // 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
-                        updateOrderStatus(threadLocal.get(), ChargeOrder.CHARGE_STATUS_已结束, ChargeOrder.ORDER_STATUS_失败, ChargeOrder.STOP_REASON_预约启动充电失败);
+                    } finally {
+                        threadLocal.remove();
                     }
                     }
-                } finally {
-                    threadLocal.remove();
                 }
                 }
             });
             });
-            if (!executor.isTerminated()) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    log.error("Delay queue processing interrupted.", e);
-                    return;
-                }
-            }
         }
         }
     }
     }
 
 
@@ -170,4 +165,4 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
         }
         }
         return START_DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(startChargeSeq));
         return START_DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(startChargeSeq));
     }
     }
-}
+}