|
@@ -9,6 +9,7 @@ import com.kym.service.jobs.DelayService;
|
|
|
import com.kym.service.jobs.DelayedItem;
|
|
import com.kym.service.jobs.DelayedItem;
|
|
|
import com.kym.service.miniapp.ChargeOrderService;
|
|
import com.kym.service.miniapp.ChargeOrderService;
|
|
|
import com.kym.service.miniapp.ChargeService;
|
|
import com.kym.service.miniapp.ChargeService;
|
|
|
|
|
+import jakarta.annotation.PreDestroy;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|
|
import org.springframework.context.annotation.Scope;
|
|
import org.springframework.context.annotation.Scope;
|
|
@@ -20,6 +21,9 @@ import org.springframework.stereotype.Component;
|
|
|
import java.util.concurrent.DelayQueue;
|
|
import java.util.concurrent.DelayQueue;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @author skyline
|
|
* @author skyline
|
|
@@ -42,7 +46,12 @@ public class StopChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
/**
|
|
/**
|
|
|
* 线程池
|
|
* 线程池
|
|
|
*/
|
|
*/
|
|
|
- private final ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
|
|
|
|
|
+ private final static int MAX_THREAD_COUNT = 3;
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Java 21 虚拟线程池
|
|
|
|
|
+ * 替代传统的ThreadPoolExecutor,提供更好的I/O密集型任务性能
|
|
|
|
|
+ */
|
|
|
|
|
+ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
|
|
|
|
|
|
|
|
public StopChargeDelayJob(ChargeOrderService chargeOrderService, ChargeService chargeService) {
|
|
public StopChargeDelayJob(ChargeOrderService chargeOrderService, ChargeService chargeService) {
|
|
|
this.chargeOrderService = chargeOrderService;
|
|
this.chargeOrderService = chargeOrderService;
|
|
@@ -78,48 +87,44 @@ public class StopChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private void processDelayedOrders() {
|
|
private void processDelayedOrders() {
|
|
|
- while (true) {
|
|
|
|
|
|
|
+ // 启动线程池中的所有线程处理延迟队列
|
|
|
|
|
+ for (int i = 0; i < MAX_THREAD_COUNT; i++) {
|
|
|
executor.execute(() -> {
|
|
executor.execute(() -> {
|
|
|
ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> null); // 初始化为空值,避免使用new ThreadLocal()
|
|
ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> null); // 初始化为空值,避免使用new ThreadLocal()
|
|
|
log.info("预约停止充电处理线程:{}", Thread.currentThread().getName());
|
|
log.info("预约停止充电处理线程:{}", Thread.currentThread().getName());
|
|
|
- DelayedItem<DelayChargeOrder> delayedItem;
|
|
|
|
|
-
|
|
|
|
|
- try {
|
|
|
|
|
- // 线程休眠300ms
|
|
|
|
|
- Thread.sleep(300);
|
|
|
|
|
- delayedItem = STOP_DELAY_QUEUE.take();
|
|
|
|
|
- // 停止充电
|
|
|
|
|
- var order = delayedItem.data;
|
|
|
|
|
- threadLocal.set(order.getStartChargeSeq());
|
|
|
|
|
- // 查询该设备最新订单是否是当前预约订单,是则按计划停止
|
|
|
|
|
- var currentChargeOrder = chargeOrderService.lambdaQuery().eq(ChargeOrder::getConnectorId, order.getConnectorId()).orderByDesc(ChargeOrder::getCreateTime).list().get(0);
|
|
|
|
|
- if (currentChargeOrder.getStartChargeSeq().equals(order.getStartChargeSeq())) {
|
|
|
|
|
- chargeService.queryStopCharge(order.getUserId(), order.getConnectorId());
|
|
|
|
|
- log.info("预约充电停止成功:用户:{},订单号:{},预约停止时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getEndTime());
|
|
|
|
|
-
|
|
|
|
|
- } else {
|
|
|
|
|
- log.error("预约充电停止异常:订单不匹配:原订单:{},当前设备最新订单:{}", order.getStartChargeSeq(), currentChargeOrder.getStartChargeSeq());
|
|
|
|
|
- throw new BusinessException("预约充电停止异常");
|
|
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ DelayedItem<DelayChargeOrder> delayedItem = null;
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 阻塞等待延迟队列中的任务
|
|
|
|
|
+ delayedItem = STOP_DELAY_QUEUE.take();
|
|
|
|
|
+ log.info("出队预约停止充电订单:{}, 队列剩余:{}", delayedItem.data.getStartChargeSeq(), STOP_DELAY_QUEUE.size());
|
|
|
|
|
+ // 停止充电
|
|
|
|
|
+ var order = delayedItem.data;
|
|
|
|
|
+ threadLocal.set(order.getStartChargeSeq());
|
|
|
|
|
+ // 查询该设备最新订单是否是当前预约订单,是则按计划停止
|
|
|
|
|
+ var currentChargeOrder = chargeOrderService.lambdaQuery().eq(ChargeOrder::getConnectorId, order.getConnectorId()).orderByDesc(ChargeOrder::getCreateTime).list().get(0);
|
|
|
|
|
+ if (currentChargeOrder.getStartChargeSeq().equals(order.getStartChargeSeq())) {
|
|
|
|
|
+ chargeService.queryStopCharge(order.getUserId(), order.getConnectorId());
|
|
|
|
|
+ log.info("预约充电停止成功:用户:{}, 订单号:{}, 预约停止时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getEndTime());
|
|
|
|
|
+
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.error("预约充电停止异常:订单不匹配:原订单:{}, 当前设备最新订单:{}", order.getStartChargeSeq(), currentChargeOrder.getStartChargeSeq());
|
|
|
|
|
+ throw new BusinessException("预约充电停止异常");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ if (e instanceof InterruptedException) {
|
|
|
|
|
+ log.error("预约停止充电队列take异常", e);
|
|
|
|
|
+ // 线程被中断,退出循环
|
|
|
|
|
+ break;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.info("预约停止充电失败,订单号:{}", threadLocal.get(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ threadLocal.remove();
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- if (e instanceof InterruptedException) {
|
|
|
|
|
- log.error("预约停止充电队列take异常", e);
|
|
|
|
|
- } else {
|
|
|
|
|
- log.info("预约停止充电失败,订单号:{}", threadLocal.get(), e);
|
|
|
|
|
- }
|
|
|
|
|
- } finally {
|
|
|
|
|
- threadLocal.remove();
|
|
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
- if (!executor.isTerminated()) {
|
|
|
|
|
- try {
|
|
|
|
|
- Thread.sleep(100);
|
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
|
- log.error("Delay queue processing interrupted.", e);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -141,4 +146,20 @@ public class StopChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
}
|
|
}
|
|
|
return STOP_DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(startChargeSeq));
|
|
return STOP_DELAY_QUEUE.removeIf(queue -> queue.data.getStartChargeSeq().equals(startChargeSeq));
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ @PreDestroy
|
|
|
|
|
+ public void destroy() {
|
|
|
|
|
+ log.info("正在关闭停止充电延迟任务线程池...");
|
|
|
|
|
+ executor.shutdown();
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("停止充电延迟任务线程池已关闭");
|
|
|
|
|
+ // 清理队列
|
|
|
|
|
+ STOP_DELAY_QUEUE.clear();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|