|
@@ -40,6 +40,7 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
|
|
import java.time.LocalTime;
|
|
import java.time.LocalTime;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
@@ -64,6 +65,9 @@ public class StationServiceImpl extends MyBaseServiceImpl<StationMapper, Station
|
|
|
private final EquipmentRelationService equipmentRelationService;
|
|
private final EquipmentRelationService equipmentRelationService;
|
|
|
private final ActivityStationService activityStationService;
|
|
private final ActivityStationService activityStationService;
|
|
|
private final ActivityService activityService;
|
|
private final ActivityService activityService;
|
|
|
|
|
+
|
|
|
|
|
+ // 虚拟线程执行器(Java 21+)
|
|
|
|
|
+ private final java.util.concurrent.ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
|
|
|
|
|
|
|
public StationServiceImpl(PlatformApiService platformApiService, EquipmentInfoService equipmentInfoService,
|
|
public StationServiceImpl(PlatformApiService platformApiService, EquipmentInfoService equipmentInfoService,
|
|
|
ConnectorInfoService connectorInfoService, EquipmentRelationService equipmentRelationService,
|
|
ConnectorInfoService connectorInfoService, EquipmentRelationService equipmentRelationService,
|
|
@@ -314,39 +318,72 @@ public class StationServiceImpl extends MyBaseServiceImpl<StationMapper, Station
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public List<PlatformStationStatusInfo> stationStatus(String[] stationIds) {
|
|
public List<PlatformStationStatusInfo> stationStatus(String[] stationIds) {
|
|
|
- // 站点ID对应平台名称
|
|
|
|
|
- var stationId2Platform = new HashMap<String, String>();
|
|
|
|
|
- // 先对站点id进行分组,单独去请求各自互联互通平台数据
|
|
|
|
|
- var platforms = new HashSet<String>();
|
|
|
|
|
-
|
|
|
|
|
|
|
+ // 平台名称对应站点 ID 列表(一次遍历完成分组)
|
|
|
|
|
+ Map<String, List<String>> platform2StationIds = new HashMap<>();
|
|
|
for (String stationId : stationIds) {
|
|
for (String stationId : stationIds) {
|
|
|
- platforms.add(PlatformCache.INSTANCE.getPlatformNameByStationId(stationId));
|
|
|
|
|
- stationId2Platform.put(stationId, PlatformCache.INSTANCE.getPlatformNameByStationId(stationId));
|
|
|
|
|
|
|
+ String platform = PlatformCache.INSTANCE.getPlatformNameByStationId(stationId);
|
|
|
|
|
+ platform2StationIds.computeIfAbsent(platform, k -> new ArrayList<>()).add(stationId);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // 平台名称对应站点列表
|
|
|
|
|
- var platform2StationIds = new HashMap<String, List<String>>();
|
|
|
|
|
- platforms.forEach(platformName -> {
|
|
|
|
|
- var stationIdsByPlatform = stationId2Platform.entrySet().stream().filter(entry -> entry.getValue().equals(platformName)).map(Map.Entry::getKey).toList();
|
|
|
|
|
- platform2StationIds.put(platformName, stationIdsByPlatform);
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- var res = new ArrayList<PlatformStationStatusInfo>();
|
|
|
|
|
- platforms.forEach(platformName -> {
|
|
|
|
|
- var param = """
|
|
|
|
|
- {
|
|
|
|
|
- "StationIDs":[%s]
|
|
|
|
|
- }
|
|
|
|
|
- """.formatted(platform2StationIds.get(platformName).stream().map(id -> "\"" + id + "\"").collect(Collectors.joining(","))
|
|
|
|
|
|
|
+
|
|
|
|
|
+ log.info("开始查询站点状态,站点数:{}, 平台数:{}", stationIds.length, platform2StationIds.size());
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
|
+
|
|
|
|
|
+ // 使用虚拟线程并行请求各平台数据
|
|
|
|
|
+ List<java.util.concurrent.Future<List<PlatformStationStatusInfo>>> futures = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ platform2StationIds.forEach((platformName, stations) -> {
|
|
|
|
|
+ java.util.concurrent.Future<List<PlatformStationStatusInfo>> future = virtualExecutor.submit(
|
|
|
|
|
+ () -> fetchPlatformStatus(platformName, stations)
|
|
|
);
|
|
);
|
|
|
-
|
|
|
|
|
- var response = platformApiService.platformPost(platformName, PlatformApi.PLATFORM_QUERY_STATION_STATUS.getApi(platformName), buildPlatformParams(platformName, param));
|
|
|
|
|
- var platform = PlatformCache.INSTANCE.getPlatformByName(platformName);
|
|
|
|
|
- var enStationStatus = JSONObject.parseObject(PlatformAesUtil.decrypt(platform.getDataSecret(), platform.getDataSecretIv(), response.getData()));
|
|
|
|
|
- res.addAll(enStationStatus.getJSONArray("StationStatusInfos").toJavaList(PlatformStationStatusInfo.class));
|
|
|
|
|
|
|
+ futures.add(future);
|
|
|
});
|
|
});
|
|
|
|
|
+
|
|
|
|
|
+ // 收集结果
|
|
|
|
|
+ List<PlatformStationStatusInfo> res = new ArrayList<>();
|
|
|
|
|
+ for (var future : futures) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ res.addAll(future.get());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("获取平台站点状态失败", e);
|
|
|
|
|
+ throw new BusinessException("查询站点状态失败:" + e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ long elapsed = System.currentTimeMillis() - startTime;
|
|
|
|
|
+ log.info("站点状态查询完成,耗时:{}ms, 返回数据量:{}", elapsed, res.size());
|
|
|
|
|
+
|
|
|
return res;
|
|
return res;
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取单个平台的站点状态(供虚拟线程调用)
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param platformName 平台名称
|
|
|
|
|
+ * @param stationIds 站点 ID 列表
|
|
|
|
|
+ * @return 站点状态列表
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<PlatformStationStatusInfo> fetchPlatformStatus(String platformName, List<String> stationIds) {
|
|
|
|
|
+ var param = """
|
|
|
|
|
+ {
|
|
|
|
|
+ "StationIDs":[%s]
|
|
|
|
|
+ }
|
|
|
|
|
+ """.formatted(stationIds.stream().map(id -> "\"" + id + "\"").collect(Collectors.joining(",")));
|
|
|
|
|
+
|
|
|
|
|
+ var response = platformApiService.platformPost(
|
|
|
|
|
+ platformName,
|
|
|
|
|
+ PlatformApi.PLATFORM_QUERY_STATION_STATUS.getApi(platformName),
|
|
|
|
|
+ buildPlatformParams(platformName, param)
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ var platform = PlatformCache.INSTANCE.getPlatformByName(platformName);
|
|
|
|
|
+ var enStationStatus = JSONObject.parseObject(
|
|
|
|
|
+ PlatformAesUtil.decrypt(platform.getDataSecret(), platform.getDataSecretIv(), response.getData())
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ return enStationStatus.getJSONArray("StationStatusInfos")
|
|
|
|
|
+ .toJavaList(PlatformStationStatusInfo.class);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 充电桩统计
|
|
* 充电桩统计
|