Selaa lähdekoodia

fix bug 延迟任务

skyline 2 vuotta sitten
vanhempi
säilyke
1f719e9784

+ 4 - 7
admin/src/main/java/com/kym/admin/jobs/ActivityDelayJob.java

@@ -75,11 +75,8 @@ public class ActivityDelayJob implements DelayService<DelayActivity> {
                         .setType(DelayActivity.TYPE_结束))
                 .toList();
 
-        var delayStartList = delayStartActivityList.stream().map(delay -> new DelayedItem<>(delay, delay.getExecuteTime())).toList();
-        var delayEndList = delayEndActivityList.stream().map(delay -> new DelayedItem<>(delay, delay.getExecuteTime())).toList();
-
-        DELAY_QUEUE.addAll(delayStartList);
-        DELAY_QUEUE.addAll(delayEndList);
+        delayStartActivityList.stream().map(delay -> new DelayedItem<>(delay, delay.getExecuteTime())).toList().forEach(this::addToDelayQueue);
+        delayEndActivityList.stream().map(delay -> new DelayedItem<>(delay, delay.getExecuteTime())).toList().forEach(this::addToDelayQueue);
 
         // 开启线程处理队列消息
         executor.execute(() -> {
@@ -129,7 +126,7 @@ public class ActivityDelayJob implements DelayService<DelayActivity> {
     }
 
     @Override
-    public boolean addToOrderDelayQueue(DelayedItem<DelayActivity> delayedItem) {
+    public boolean addToDelayQueue(DelayedItem<DelayActivity> delayedItem) {
         return DELAY_QUEUE.add(delayedItem);
     }
 
@@ -141,7 +138,7 @@ public class ActivityDelayJob implements DelayService<DelayActivity> {
 
 
     @Override
-    public boolean removeFromOrderDelayQueue(Object activityId) {
+    public boolean removeFromDelayQueue(Object activityId) {
         if (CommUtil.isEmptyOrNull(activityId)) {
             return false;
         }

+ 2 - 3
miniapp/src/main/java/com/kym/miniapp/jobs/StartChargeDelayJob.java

@@ -14,7 +14,6 @@ import org.springframework.context.annotation.Scope;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
 
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
@@ -100,7 +99,7 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
 
 
     @Override
-    public boolean addToOrderDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
+    public boolean addToDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
         return START_DELAY_QUEUE.add(delayedItem);
     }
 
@@ -111,7 +110,7 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
     }
 
     @Override
-    public boolean removeFromOrderDelayQueue(Object startChargeSeq) {
+    public boolean removeFromDelayQueue(Object startChargeSeq) {
         if (CommUtil.isEmptyOrNull(startChargeSeq)) {
             return false;
         }

+ 4 - 5
miniapp/src/main/java/com/kym/miniapp/jobs/StopChargeDelayJob.java

@@ -5,17 +5,16 @@ import com.kym.common.exception.BusinessException;
 import com.kym.common.utils.CommUtil;
 import com.kym.entity.miniapp.ChargeOrder;
 import com.kym.entity.miniapp.delay.DelayChargeOrder;
-import com.kym.service.miniapp.ChargeOrderService;
-import com.kym.service.miniapp.ChargeService;
 import com.kym.service.jobs.DelayService;
 import com.kym.service.jobs.DelayedItem;
+import com.kym.service.miniapp.ChargeOrderService;
+import com.kym.service.miniapp.ChargeService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
 import org.springframework.context.annotation.Scope;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
 
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
@@ -112,7 +111,7 @@ public class StopChargeDelayJob implements DelayService<DelayChargeOrder> {
 
 
     @Override
-    public boolean addToOrderDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
+    public boolean addToDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
         return STOP_DELAY_QUEUE.add(delayedItem);
     }
 
@@ -123,7 +122,7 @@ public class StopChargeDelayJob implements DelayService<DelayChargeOrder> {
     }
 
     @Override
-    public boolean removeFromOrderDelayQueue(Object startChargeSeq) {
+    public boolean removeFromDelayQueue(Object startChargeSeq) {
         if (CommUtil.isEmptyOrNull(startChargeSeq)) {
             return false;
         }

+ 2 - 2
service/src/main/java/com/kym/service/jobs/DelayService.java

@@ -12,7 +12,7 @@ public interface DelayService<T> {
      * @param delayedItem 延迟对象
      * @return boolean
      */
-    boolean addToOrderDelayQueue(DelayedItem<T> delayedItem);
+    boolean addToDelayQueue(DelayedItem<T> delayedItem);
 
     /**
      * 根据对象添加到指定延时队列
@@ -27,6 +27,6 @@ public interface DelayService<T> {
      *
      * @param id
      */
-    boolean removeFromOrderDelayQueue(Object id);
+    boolean removeFromDelayQueue(Object id);
 
 }

+ 3 - 3
service/src/main/java/com/kym/service/jobs/DelayedItem.java

@@ -2,8 +2,8 @@ package com.kym.service.jobs;
 
 import jakarta.validation.constraints.NotNull;
 
-import java.time.Duration;
 import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
@@ -31,11 +31,11 @@ public class DelayedItem<T> implements Delayed {
 
     @Override
     public long getDelay(@NotNull TimeUnit unit) {
-        return unit.convert(Duration.between(LocalDateTime.now(), delayTime));
+        return unit.convert(delayTime.toInstant(ZoneOffset.ofHours(+8)).toEpochMilli() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
     }
 
     @Override
     public int compareTo(@NotNull Delayed o) {
-        return (int) (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
+        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
     }
 }

+ 6 - 6
service/src/main/java/com/kym/service/miniapp/impl/ChargeServiceImpl.java

@@ -96,9 +96,9 @@ public class ChargeServiceImpl implements ChargeService {
         // 预约充电启动和停止队列删除数据
         var delayOrder = new DelayChargeOrder();
         BeanUtils.copyProperties(chargeOrder, delayOrder);
-        startDelayService.removeFromOrderDelayQueue(delayOrder.getStartChargeSeq());
+        startDelayService.removeFromDelayQueue(delayOrder.getStartChargeSeq());
         if (delayOrder.getEndTime() != null) {
-            stopDelayService.removeFromOrderDelayQueue(delayOrder.getStartChargeSeq());
+            stopDelayService.removeFromDelayQueue(delayOrder.getStartChargeSeq());
         }
 
         chargeOrderService.lambdaUpdate()
@@ -129,8 +129,8 @@ public class ChargeServiceImpl implements ChargeService {
         chargeOrderService.lambdaUpdate().eq(ChargeOrder::getStartChargeSeq, startChargeSeq)
                 .set(ChargeOrder::getStartTime, startTime).set(ChargeOrder::getEndTime, null).update();
         // 删除队列中原来的数据
-        startDelayService.removeFromOrderDelayQueue(startChargeSeq);
-        stopDelayService.removeFromOrderDelayQueue(startChargeSeq);
+        startDelayService.removeFromDelayQueue(startChargeSeq);
+        stopDelayService.removeFromDelayQueue(startChargeSeq);
         // 向队列中添加新的数据
         var delayChargeOrder = new DelayChargeOrder();
         BeanUtils.copyProperties(chargeOrder, delayChargeOrder);
@@ -147,8 +147,8 @@ public class ChargeServiceImpl implements ChargeService {
                 .eq(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_预约中).eq(ChargeOrder::getIsBooking, ChargeOrder.IS_BOOKING_是).one();
         if (chargeOrder != null) {
             // 清除启动/停止队列信息
-            startDelayService.removeFromOrderDelayQueue(chargeOrder.getStartChargeSeq());
-            stopDelayService.removeFromOrderDelayQueue(chargeOrder.getStartChargeSeq());
+            startDelayService.removeFromDelayQueue(chargeOrder.getStartChargeSeq());
+            stopDelayService.removeFromDelayQueue(chargeOrder.getStartChargeSeq());
             // 修改订单状态为取消
             chargeOrderService.lambdaUpdate().set(ChargeOrder::getOrderStatus, ChargeOrder.ORDER_STATUS_取消)
                     .set(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_已结束)