|
@@ -1,7 +1,6 @@
|
|
|
package com.kym.miniapp.jobs;
|
|
package com.kym.miniapp.jobs;
|
|
|
|
|
|
|
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
|
|
-import com.google.common.util.concurrent.RateLimiter;
|
|
|
|
|
import com.kym.common.constant.ResponseEnum;
|
|
import com.kym.common.constant.ResponseEnum;
|
|
|
import com.kym.common.exception.BusinessException;
|
|
import com.kym.common.exception.BusinessException;
|
|
|
import com.kym.common.utils.CommUtil;
|
|
import com.kym.common.utils.CommUtil;
|
|
@@ -19,6 +18,7 @@ import org.springframework.context.event.EventListener;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.concurrent.DelayQueue;
|
|
import java.util.concurrent.DelayQueue;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
@@ -34,8 +34,6 @@ import java.util.concurrent.Executors;
|
|
|
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) // 设置成单例
|
|
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) // 设置成单例
|
|
|
public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
|
|
|
|
|
- private final static RateLimiter rateLimiter = RateLimiter.create(4);
|
|
|
|
|
-
|
|
|
|
|
/**
|
|
/**
|
|
|
* 预约订单队列
|
|
* 预约订单队列
|
|
|
*/
|
|
*/
|
|
@@ -47,6 +45,11 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
*/
|
|
*/
|
|
|
private final ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
private final ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 重试列表
|
|
|
|
|
+ */
|
|
|
|
|
+ private ArrayList<String> retryList = new ArrayList<>();
|
|
|
|
|
+
|
|
|
public StartChargeDelayJob(ChargeOrderService chargeOrderService, ChargeService chargeService) {
|
|
public StartChargeDelayJob(ChargeOrderService chargeOrderService, ChargeService chargeService) {
|
|
|
this.chargeOrderService = chargeOrderService;
|
|
this.chargeOrderService = chargeOrderService;
|
|
|
this.chargeService = chargeService;
|
|
this.chargeService = chargeService;
|
|
@@ -73,7 +76,6 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
.toList();
|
|
.toList();
|
|
|
var delayList = delayChargeOrderList.stream().map(delay -> new DelayedItem<>(delay, delay.getStartTime())).toList();
|
|
var delayList = delayChargeOrderList.stream().map(delay -> new DelayedItem<>(delay, delay.getStartTime())).toList();
|
|
|
START_DELAY_QUEUE.addAll(delayList);
|
|
START_DELAY_QUEUE.addAll(delayList);
|
|
|
-
|
|
|
|
|
// 开启线程处理队列消息
|
|
// 开启线程处理队列消息
|
|
|
processDelayedOrders();
|
|
processDelayedOrders();
|
|
|
}
|
|
}
|
|
@@ -84,13 +86,11 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> null); // 初始化为空值,避免使用new ThreadLocal()
|
|
ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> null); // 初始化为空值,避免使用new ThreadLocal()
|
|
|
log.info("预约启动充电处理线程:{}", Thread.currentThread().getName());
|
|
log.info("预约启动充电处理线程:{}", Thread.currentThread().getName());
|
|
|
DelayedItem<DelayChargeOrder> delayedItem = null;
|
|
DelayedItem<DelayChargeOrder> delayedItem = null;
|
|
|
-
|
|
|
|
|
try {
|
|
try {
|
|
|
delayedItem = START_DELAY_QUEUE.take();
|
|
delayedItem = START_DELAY_QUEUE.take();
|
|
|
log.info("出队预约充电订单:{},队列剩余:{}", delayedItem.data.getStartChargeSeq(), START_DELAY_QUEUE.size());
|
|
log.info("出队预约充电订单:{},队列剩余:{}", delayedItem.data.getStartChargeSeq(), START_DELAY_QUEUE.size());
|
|
|
// 启动充电
|
|
// 启动充电
|
|
|
var order = delayedItem.data;
|
|
var order = delayedItem.data;
|
|
|
- threadLocal.set(order.getStartChargeSeq());
|
|
|
|
|
chargeService.queryStartCharge(order.getUserId(), order.getConnectorId(), null, false, null, null);
|
|
chargeService.queryStartCharge(order.getUserId(), order.getConnectorId(), null, false, null, null);
|
|
|
log.info("预约充电启动成功:用户:{},订单号:{},预约启动时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getStartTime());
|
|
log.info("预约充电启动成功:用户:{},订单号:{},预约启动时间:{}", order.getUserId(), order.getStartChargeSeq(), order.getStartTime());
|
|
|
// 线程休眠250ms
|
|
// 线程休眠250ms
|
|
@@ -101,18 +101,26 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
} else {
|
|
} else {
|
|
|
log.info("预约启动充电失败,订单号:{}", threadLocal.get());
|
|
log.info("预约启动充电失败,订单号:{}", threadLocal.get());
|
|
|
if (e instanceof BusinessException && (ResponseEnum.EN_PLUS_TOKEN_EXCEPTION.getCode().equals(((BusinessException) e).getCode()))) {
|
|
if (e instanceof BusinessException && (ResponseEnum.EN_PLUS_TOKEN_EXCEPTION.getCode().equals(((BusinessException) e).getCode()))) {
|
|
|
- log.info("EN+ token异常,预约订单重试");
|
|
|
|
|
|
|
+ if (retryList.contains(threadLocal.get())) {
|
|
|
|
|
+ log.info("EN+ token异常,预约订单:{}已重试忽略", threadLocal.get());
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ // 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
|
|
|
|
|
+ updateOrderStatus(threadLocal.get(), ChargeOrder.CHARGE_STATUS_已结束, ChargeOrder.ORDER_STATUS_失败, ChargeOrder.STOP_REASON_预约启动充电失败);
|
|
|
|
|
+ retryList.remove(threadLocal.get());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("EN+ token异常,预约订单:{},重试", threadLocal.get());
|
|
|
// token异常就重新放入队列重试
|
|
// token异常就重新放入队列重试
|
|
|
- addToDelayQueue(delayedItem);
|
|
|
|
|
|
|
+ var success = addToDelayQueue(delayedItem);
|
|
|
|
|
+ if (success) {
|
|
|
|
|
+ retryList.add(threadLocal.get());
|
|
|
|
|
+ }
|
|
|
|
|
+ // 跳出本次循环
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
log.error(e.getMessage());
|
|
log.error(e.getMessage());
|
|
|
// 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
|
|
// 启动失败将订单状态修改为充电状态已结束,订单状态已确认,结束原因:预约启动失败
|
|
|
- chargeOrderService.lambdaUpdate()
|
|
|
|
|
- .eq(ChargeOrder::getStartChargeSeq, threadLocal.get())
|
|
|
|
|
- .set(ChargeOrder::getChargeStatus, ChargeOrder.CHARGE_STATUS_已结束)
|
|
|
|
|
- .set(ChargeOrder::getOrderStatus, ChargeOrder.ORDER_STATUS_失败)
|
|
|
|
|
- .set(ChargeOrder::getStopReason, ChargeOrder.STOP_REASON_预约启动充电失败)
|
|
|
|
|
- .update();
|
|
|
|
|
|
|
+ updateOrderStatus(threadLocal.get(), ChargeOrder.CHARGE_STATUS_已结束, ChargeOrder.ORDER_STATUS_失败, ChargeOrder.STOP_REASON_预约启动充电失败);
|
|
|
}
|
|
}
|
|
|
} finally {
|
|
} finally {
|
|
|
threadLocal.remove();
|
|
threadLocal.remove();
|
|
@@ -129,6 +137,14 @@ public class StartChargeDelayJob implements DelayService<DelayChargeOrder> {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private void updateOrderStatus(String startChargeSeq, int chargeStatus, int orderStatus, int stopReason) {
|
|
|
|
|
+ chargeOrderService.lambdaUpdate()
|
|
|
|
|
+ .eq(ChargeOrder::getStartChargeSeq, startChargeSeq)
|
|
|
|
|
+ .set(ChargeOrder::getChargeStatus, chargeStatus)
|
|
|
|
|
+ .set(ChargeOrder::getOrderStatus, orderStatus)
|
|
|
|
|
+ .set(ChargeOrder::getStopReason, stopReason)
|
|
|
|
|
+ .update();
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public boolean addToDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
|
|
public boolean addToDelayQueue(DelayedItem<DelayChargeOrder> delayedItem) {
|