skyline %!s(int64=2) %!d(string=hai) anos
pai
achega
40522d94aa

+ 1 - 1
admin/src/main/java/com/kym/admin/AdminApplication.java

@@ -14,7 +14,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 @EnableScheduling
 @SpringBootApplication
 @ComponentScan(value = {"com.kym"}, excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE)
-        , @ComponentScan.Filter(type = FilterType.REGEX, pattern = {"com.kym.service.miniapp.impl.DelayServiceImpl"})})
+        , @ComponentScan.Filter(type = FilterType.REGEX, pattern = {"com.kym.service.miniapp.impl.StartDelayServiceImpl","com.kym.service.miniapp.impl.StopDelayServiceImpl"})})
 @MapperScan(basePackages = {"com.kym.mapper"})
 public class AdminApplication {
 

+ 53 - 0
entity/src/main/java/com/kym/entity/miniapp/delay/DelayChargeOrder.java

@@ -0,0 +1,53 @@
+package com.kym.entity.miniapp.delay;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * 延迟队列使用充电订单对象
+ *
+ * @author skyline
+ * @since 2023-08-08
+ */
+@Getter
+@Setter
+@Accessors(chain = true)
+public class DelayChargeOrder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Long userId;
+
+    /**
+     * 充电订单号(EN+)
+     */
+    private String startChargeSeq;
+
+    /**
+     * 充电设备接口编码(EN+)
+     */
+    private String connectorId;
+
+    /**
+     * 充电开始时间
+     */
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private LocalDateTime startTime;
+
+    /**
+     * 充电结束时间
+     */
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private LocalDateTime endTime;
+
+    /**
+     * 充电状态:1:启动中 2:充电中 3:停止中 4:已结束 5:未知
+     */
+    private Integer chargeStatus;
+
+}

+ 1 - 1
service/src/main/java/com/kym/service/miniapp/DelayService.java

@@ -29,6 +29,6 @@ public interface DelayService<T> {
      *
      * @param data
      */
-    boolean removeToOrderDelayQueue(T data);
+    boolean removeFromOrderDelayQueue(T data);
 
 }

+ 41 - 11
service/src/main/java/com/kym/service/miniapp/impl/ChargeServiceImpl.java

@@ -7,10 +7,12 @@ import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
 import com.kym.common.config.EnPlusConfig;
 import com.kym.common.constant.ResponseEnum;
 import com.kym.common.exception.BusinessException;
+import com.kym.common.utils.CommUtil;
 import com.kym.common.utils.OrderUtils;
 import com.kym.entity.admin.EquipmentInfo;
 import com.kym.entity.enplus.response.EnBusinessPolicy;
 import com.kym.entity.miniapp.ChargeOrder;
+import com.kym.entity.miniapp.delay.DelayChargeOrder;
 import com.kym.service.admin.EquipmentInfoService;
 import com.kym.service.admin.EquipmentRelationService;
 import com.kym.service.enplus.EnPlusService;
@@ -20,6 +22,8 @@ import com.kym.service.miniapp.ChargeService;
 import com.kym.service.miniapp.DelayService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 
@@ -49,18 +53,22 @@ public class ChargeServiceImpl implements ChargeService {
 
     private final EnPlusConfig enPlusConfig;
 
-    private final DelayService<ChargeOrder> delayService;
+    private final DelayService<DelayChargeOrder> startDelayService;
+    private final DelayService<DelayChargeOrder> stopDelayService;
 
     public ChargeServiceImpl(EquipmentRelationService equipmentRelationService, EquipmentInfoService equipmentInfoService,
                              ChargeOrderService chargeOrderService, AccountService accountService,
-                             EnPlusService enPlusService, EnPlusConfig enPlusConfig, @Lazy DelayService<ChargeOrder> delayService) {
+                             EnPlusService enPlusService, EnPlusConfig enPlusConfig,
+                             @Qualifier("startDelayServiceImpl") @Lazy DelayService<DelayChargeOrder> startDelayService,
+                             @Qualifier("stopDelayServiceImpl") @Lazy DelayService<DelayChargeOrder> stopDelayService) {
         this.equipmentRelationService = equipmentRelationService;
         this.equipmentInfoService = equipmentInfoService;
         this.chargeOrderService = chargeOrderService;
         this.accountService = accountService;
         this.enPlusService = enPlusService;
         this.enPlusConfig = enPlusConfig;
-        this.delayService = delayService;
+        this.startDelayService = startDelayService;
+        this.stopDelayService = stopDelayService;
     }
 
 
@@ -74,11 +82,18 @@ public class ChargeServiceImpl implements ChargeService {
     public Map<String, String> immediatelyCharge(String connectorId) {
         var userId = StpUtil.getLoginIdAsLong();
         // 修改订单状态,取消预约
-        chargeOrderService.lambdaUpdate()
-                .set(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_未知)
+        var chargeOrder = chargeOrderService.lambdaQuery()
+//                .set(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_未知)
                 .eq(ChargeOrder::getConnectorId, connectorId)
                 .eq(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_预约中)
-                .update();
+                .one();
+        // TODO 预约充电启动和停止队列删除数据
+        var delayOrder = new DelayChargeOrder();
+        BeanUtils.copyProperties(chargeOrder, delayOrder);
+        startDelayService.removeFromOrderDelayQueue(delayOrder);
+        if (delayOrder.getEndTime() != null) {
+            stopDelayService.removeFromOrderDelayQueue(delayOrder);
+        }
         return queryStartCharge(userId, connectorId, false, null);
     }
 
@@ -92,8 +107,12 @@ public class ChargeServiceImpl implements ChargeService {
     @Override
     public void modifyBookingTime(String startChargeSeq, LocalDateTime startTime) {
         // 预约充电队列更新
-        var chargeOrder = chargeOrderService.getChargingOrderByStartChargeSeq(startChargeSeq);
-        boolean success = delayService.removeToOrderDelayQueue(chargeOrder);
+        var chargeOrder = chargeOrderService.lambdaQuery().eq(ChargeOrder::getStartChargeSeq, startChargeSeq)
+                .eq(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_预约中).one();
+        var delayChargeOrder = new DelayChargeOrder();
+        BeanUtils.copyProperties(chargeOrder, delayChargeOrder);
+        boolean success = CommUtil.isNotEmptyAndNull(chargeOrder.getEndTime()) ?
+                startDelayService.removeFromOrderDelayQueue(delayChargeOrder) : stopDelayService.removeFromOrderDelayQueue(delayChargeOrder);
         if (!success) {
             throw new BusinessException("修改预约时间失败");
         }
@@ -170,15 +189,26 @@ public class ChargeServiceImpl implements ChargeService {
         // 如果是预约订单,则将订单放入预约充电延迟队列
         if (isBooking) {
             order = order.setStartTime(startTime).setChargeStatus(ChargeOrder.CHARGE_STATUS_预约中);
+
+            // TODO: 2023-10-21 如果有设置结束时间,取消时记得删
+            var endTime = LocalDateTime.now();
+            order.setEndTime(endTime);
+
             chargeOrderService.lambdaUpdate()
                     .set(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_预约中)
                     .set(ChargeOrder::getStartTime, startTime)
+                    .set(CommUtil.isNotEmptyAndNull(endTime), ChargeOrder::getEndTime, endTime)
                     .eq(ChargeOrder::getStartChargeSeq, order.getStartChargeSeq())
                     .update();
-            var flag = delayService.addToDelayQueue(order.setStartTime(startTime));
+
+            var delayChargeOrder = new DelayChargeOrder();
+            BeanUtils.copyProperties(order, delayChargeOrder);
+
+            var flag = startDelayService.addToDelayQueue(delayChargeOrder.setStartTime(startTime));
             if (flag) {
-                // 修改设备状态为预约中
-                // 切换数据源
+                // 如果有结束时间,将订单放入结束充电延迟队列中
+                stopDelayService.addToDelayQueue(delayChargeOrder.setEndTime(endTime));
+                // 切换数据源,修改设备状态为预约中
                 updateEquipmentStatus(connectorId);
                 LOGGER.info("预约充电成功,用户:{},订单号:{}", userId, order.getStartChargeSeq());
                 return Map.of("startChargeSeq", order.getStartChargeSeq());

+ 27 - 18
service/src/main/java/com/kym/service/miniapp/impl/DelayServiceImpl.java → service/src/main/java/com/kym/service/miniapp/impl/StartDelayServiceImpl.java

@@ -1,8 +1,8 @@
 package com.kym.service.miniapp.impl;
 
 import com.baomidou.dynamic.datasource.annotation.DS;
-import com.kym.common.exception.BusinessException;
 import com.kym.entity.miniapp.ChargeOrder;
+import com.kym.entity.miniapp.delay.DelayChargeOrder;
 import com.kym.service.miniapp.ChargeOrderService;
 import com.kym.service.miniapp.ChargeService;
 import com.kym.service.miniapp.DelayService;
@@ -27,13 +27,13 @@ import java.util.concurrent.Executors;
 @Slf4j
 @DS("db-miniapp")
 @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) // 设置成单例
-public class DelayServiceImpl implements DelayService<ChargeOrder> {
+public class StartDelayServiceImpl implements DelayService<DelayChargeOrder> {
 
 
     /**
      * 预约订单队列
      */
-    private final static DelayQueue<DelayedItem<ChargeOrder>> DELAY_QUEUE = new DelayQueue<>();
+    private final static DelayQueue<DelayedItem<DelayChargeOrder>> START_DELAY_QUEUE = new DelayQueue<>();
     private final ChargeOrderService chargeOrderService;
     private final ChargeService chargeService;
     /**
@@ -41,7 +41,7 @@ public class DelayServiceImpl implements DelayService<ChargeOrder> {
      */
     private final ExecutorService executor = Executors.newFixedThreadPool(2);
 
-    public DelayServiceImpl(ChargeOrderService chargeOrderService, ChargeService chargeService) {
+    public StartDelayServiceImpl(ChargeOrderService chargeOrderService, ChargeService chargeService) {
         this.chargeOrderService = chargeOrderService;
         this.chargeService = chargeService;
     }
@@ -55,28 +55,37 @@ public class DelayServiceImpl implements DelayService<ChargeOrder> {
                 .eq(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_预约中)
                 .orderByAsc(ChargeOrder::getStartTime)
                 .list();
-        DELAY_QUEUE.addAll(orderList.stream().map(order -> new DelayedItem<>(order, order.getStartTime())).toList());
+        var delayChargeOrderList = orderList.stream().map(o -> new DelayChargeOrder()
+                        .setStartChargeSeq(o.getStartChargeSeq())
+                        .setUserId(o.getUserId())
+                        .setConnectorId(o.getConnectorId())
+                        .setStartTime(o.getStartTime())
+                        .setEndTime(o.getEndTime())
+                        .setChargeStatus(o.getChargeStatus()))
+                .toList();
+        var delayList = delayChargeOrderList.stream().map(delay -> new DelayedItem<>(delay, delay.getStartTime())).toList();
+        START_DELAY_QUEUE.addAll(delayList);
 
         // 开启线程处理队列消息
         executor.execute(() -> {
             ThreadLocal<String> threadLocal = new ThreadLocal<>();
-            log.info("启动预约充电订单处理线程:{}", Thread.currentThread().getName());
-            DelayedItem<ChargeOrder> delayedItem;
+            log.info("预约充电订单处理线程:{}", Thread.currentThread().getName());
+            DelayedItem<DelayChargeOrder> delayedItem;
             while (true) {
                 try {
-                    delayedItem = DELAY_QUEUE.take();
+                    delayedItem = START_DELAY_QUEUE.take();
                     // 启动充电
                     var order = delayedItem.data;
                     threadLocal.set(order.getStartChargeSeq());
                     chargeService.queryStartCharge(order.getUserId(), order.getConnectorId(), false, null);
-                    log.info("预约充电启动成:用户:{},订单号:{},预约启动时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getStartTime());
+                    log.info("预约充电启动成:用户:{},订单号:{},预约启动时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getStartTime());
                     // 线程休眠100ms
                     Thread.sleep(100);
                 } catch (Exception e) {
                     if (e instanceof InterruptedException) {
                         log.error("预约充电队列take异常", e);
                     } else {
-                        log.info("预约充电启动失败,订单号:{}", threadLocal.get(), e);
+                        log.info("预约启动充电失败,订单号:{}", threadLocal.get(), e);
                     }
                 } finally {
                     threadLocal.remove();
@@ -88,21 +97,21 @@ public class DelayServiceImpl implements DelayService<ChargeOrder> {
 
 
     @Override
-    public boolean addToOrderDelayQueue(DelayedItem<ChargeOrder> delayedItem) {
-        return DELAY_QUEUE.add(delayedItem);
+    public boolean addToOrderDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
+        return START_DELAY_QUEUE.add(delayedItem);
     }
 
     @Override
-    public boolean addToDelayQueue(ChargeOrder chargeOrder) {
-        DelayedItem<ChargeOrder> orderDelayed = new DelayedItem<>(chargeOrder, chargeOrder.getStartTime());
-        return DELAY_QUEUE.add(orderDelayed);
+    public boolean addToDelayQueue(DelayChargeOrder delayChargeOrder) {
+        DelayedItem<DelayChargeOrder> orderDelayed = new DelayedItem<>(delayChargeOrder, delayChargeOrder.getStartTime());
+        return START_DELAY_QUEUE.add(orderDelayed);
     }
 
     @Override
-    public boolean removeToOrderDelayQueue(ChargeOrder chargeOrder) {
-        if (chargeOrder == null) {
+    public boolean removeFromOrderDelayQueue(DelayChargeOrder delayChargeOrder) {
+        if (delayChargeOrder == null) {
             return false;
         }
-        return DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(chargeOrder.getStartChargeSeq()));
+        return START_DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(delayChargeOrder.getStartChargeSeq()));
     }
 }

+ 119 - 0
service/src/main/java/com/kym/service/miniapp/impl/StopDelayServiceImpl.java

@@ -0,0 +1,119 @@
+package com.kym.service.miniapp.impl;
+
+import com.baomidou.dynamic.datasource.annotation.DS;
+import com.kym.entity.miniapp.ChargeOrder;
+import com.kym.entity.miniapp.delay.DelayChargeOrder;
+import com.kym.service.miniapp.ChargeOrderService;
+import com.kym.service.miniapp.ChargeService;
+import com.kym.service.miniapp.DelayService;
+import com.kym.service.queue.DelayedItem;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author skyline
+ * @description
+ * @date 2023-10-08 22:11
+ */
+@Service
+@Slf4j
+@DS("db-miniapp")
+@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) // 设置成单例
+public class StopDelayServiceImpl implements DelayService<DelayChargeOrder> {
+
+
+    /**
+     * 预约订单队列
+     */
+    private final static DelayQueue<DelayedItem<DelayChargeOrder>> STOP_DELAY_QUEUE = new DelayQueue<>();
+    private final ChargeOrderService chargeOrderService;
+    private final ChargeService chargeService;
+    /**
+     * 线程池
+     */
+    private final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+    public StopDelayServiceImpl(ChargeOrderService chargeOrderService, ChargeService chargeService) {
+        this.chargeOrderService = chargeOrderService;
+        this.chargeService = chargeService;
+    }
+
+    @DS("db-miniapp")
+    // 这里不能使用@PostConstruct,在初始化完成后, bean 进入增强阶段, 所以这个阶段的任何AOP都是无效的,https://www.cnblogs.com/eternityz/p/15330069.html
+    @EventListener
+    public void init(ContextRefreshedEvent event) {
+        // 队列加载所有充电状态为预约中且有结束时间的订单,按照开始时间排序
+        var orderList = chargeOrderService.lambdaQuery()
+                .eq(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_预约中)
+                .isNotNull(ChargeOrder::getEndTime)
+                .orderByAsc(ChargeOrder::getEndTime)
+                .list();
+
+        var delayChargeOrderList = orderList.stream().map(o -> new DelayChargeOrder()
+                        .setStartChargeSeq(o.getStartChargeSeq())
+                        .setUserId(o.getUserId())
+                        .setConnectorId(o.getConnectorId())
+                        .setStartTime(o.getStartTime())
+                        .setEndTime(o.getEndTime())
+                        .setChargeStatus(o.getChargeStatus()))
+                .toList();
+        var delayList = delayChargeOrderList.stream().map(delay -> new DelayedItem<>(delay, delay.getEndTime())).toList();
+        STOP_DELAY_QUEUE.addAll(delayList);
+
+        // 开启线程处理队列消息
+        executor.execute(() -> {
+            ThreadLocal<String> threadLocal = new ThreadLocal<>();
+            log.info("预约停止充电订单处理线程:{}", Thread.currentThread().getName());
+            DelayedItem<DelayChargeOrder> delayedItem;
+            while (true) {
+                try {
+                    delayedItem = STOP_DELAY_QUEUE.take();
+                    // 停止充电
+                    var order = delayedItem.data;
+                    threadLocal.set(order.getStartChargeSeq());
+                    chargeService.queryStopCharge(order.getConnectorId());
+                    log.info("预约充电停止成功:用户:{},订单号:{},预约停止时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getEndTime());
+                    // 线程休眠100ms
+                    Thread.sleep(100);
+                } catch (Exception e) {
+                    if (e instanceof InterruptedException) {
+                        log.error("预约停止充电队列take异常", e);
+                    } else {
+                        log.info("预约停止充电失败,订单号:{}", threadLocal.get(), e);
+                    }
+                } finally {
+                    threadLocal.remove();
+                }
+            }
+        });
+
+    }
+
+
+    @Override
+    public boolean addToOrderDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
+        return STOP_DELAY_QUEUE.add(delayedItem);
+    }
+
+    @Override
+    public boolean addToDelayQueue(DelayChargeOrder delayChargeOrder) {
+        DelayedItem<DelayChargeOrder> orderDelayed = new DelayedItem<>(delayChargeOrder, delayChargeOrder.getEndTime());
+        return STOP_DELAY_QUEUE.add(orderDelayed);
+    }
+
+    @Override
+    public boolean removeFromOrderDelayQueue(DelayChargeOrder delayChargeOrder) {
+        if (delayChargeOrder == null) {
+            return false;
+        }
+        return STOP_DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(delayChargeOrder.getStartChargeSeq()));
+    }
+}

+ 5 - 5
service/src/main/java/com/kym/service/queue/DelayedItem.java

@@ -19,19 +19,19 @@ public class DelayedItem<T> implements Delayed {
     public final T data;
 
     /**
-     * 预约启动时间
+     * 定时时间
      */
-    public LocalDateTime startTime = LocalDateTime.MIN;
+    public LocalDateTime delayTime = LocalDateTime.MIN;
 
-    public DelayedItem(T data, LocalDateTime startTime) {
+    public DelayedItem(T data, LocalDateTime delayTime) {
         this.data = data;
-        this.startTime = startTime;
+        this.delayTime = delayTime;
     }
 
 
     @Override
     public long getDelay(@NotNull TimeUnit unit) {
-        return unit.convert(Duration.between(LocalDateTime.now(), startTime));
+        return unit.convert(Duration.between(LocalDateTime.now(), delayTime));
     }
 
     @Override