package com.kym.service.enplus.impl; import cn.hutool.extra.mail.MailUtil; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.kym.entity.admin.ConnectorInfo; import com.kym.entity.admin.EquipmentInfo; import com.kym.entity.admin.MonitorLog; import com.kym.entity.common.RedisKeys; import com.kym.entity.enplus.EnConnectorStatusInfo; import com.kym.entity.miniapp.Account; import com.kym.entity.miniapp.ChargeOrder; import com.kym.entity.miniapp.WalletDetail; import com.kym.service.admin.ConnectorInfoService; import com.kym.service.admin.EquipmentInfoService; import com.kym.service.admin.MonitorLogService; import com.kym.service.cache.KymCache; import com.kym.service.enplus.EnNotifyService; import com.kym.service.enplus.EnPlusService; import com.kym.service.factory.DiscountStrategyFactory; import com.kym.service.miniapp.*; import jakarta.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.stream.Collectors; /** * @author skyline * @description * @date 2023-08-04 14:26 */ @Service public class EnNotifyServiceImpl implements EnNotifyService { private static final Logger LOGGER = LoggerFactory.getLogger(EnNotifyServiceImpl.class); public final StringRedisTemplate redisTemplate; private final EnPlusService enPlusService; private final ChargeOrderService chargeOrderService; private final ChargeService chargeService; private final AccountService accountService; private final WalletDetailService walletDetailService; private final MonitorLogService monitorLogService; private final EquipmentInfoService equipmentInfoService; private final ConnectorInfoService connectorInfoService; private final UserStationService userStationService; @Value("${kym.notify-email}") private String notifyEmail; public EnNotifyServiceImpl(EnPlusService enPlusService, ChargeOrderService chargeOrderService, ChargeService chargeService, AccountService accountService, WalletDetailService walletDetailService, MonitorLogService monitorLogService, EquipmentInfoService equipmentInfoService, ConnectorInfoService connectorInfoService, StringRedisTemplate redisTemplate, UserStationService userStationService) { this.enPlusService = enPlusService; this.chargeOrderService = chargeOrderService; this.chargeService = chargeService; this.accountService = accountService; this.walletDetailService = walletDetailService; this.monitorLogService = monitorLogService; this.equipmentInfoService = equipmentInfoService; this.connectorInfoService = connectorInfoService; this.redisTemplate = redisTemplate; this.userStationService = userStationService; } @PostConstruct void init() { KymCache.INSTANCE.putConnectorId2Status(connectorInfoService.list().stream().collect(Collectors.toMap(item -> item.getConnectorId().concat("1"), ConnectorInfo::getStatus))); } /** * EN+ 充电站设备状态变化推送 * * @param json * @return */ @Override public String handleNotificationStationStatus(JSONObject json) { var data = enPlusService.signValidation(json); LOGGER.info("【EN+推送】收到充电桩设备状态变化推送:{},解密数据:{}", json, data); // 更新数据库,存入redis,发送邮件通知 var connectorStatusInfo = JSONObject.parseObject(data).getJSONObject("ConnectorStatusInfo").toJavaObject(EnConnectorStatusInfo.class); var connectorId = connectorStatusInfo.getConnectorId(); var equipmentId = connectorId.substring(0, 16); equipmentInfoService.lambdaUpdate() .eq(EquipmentInfo::getEquipmentId, equipmentId) .set(EquipmentInfo::getServiceStatus, connectorStatusInfo.getStatus()) .update(); connectorInfoService.lambdaUpdate() .eq(ConnectorInfo::getConnectorId, connectorId) .set(ConnectorInfo::getStatus, connectorStatusInfo.getStatus()) .update(); var connectorStatus = connectorStatusInfo.getStatus(); if (connectorStatus == 0) { LOGGER.info("充电桩设备离线:{}", connectorStatusInfo.getConnectorId()); // 如果设备离线,则存入redis var monitorLog = new MonitorLog() .setStationId(KymCache.INSTANCE.getStationIdByEquipmentIdOrConnectorId(connectorStatusInfo.getConnectorId())) .setSn(connectorStatusInfo.getConnectorId()) .setOfflineTime(LocalDateTime.now()) .setType(2) .setOfflineStatus(connectorStatusInfo.getStatus()); monitorLogService.save(monitorLog); // 离线设备放入队列,60分钟之后如果还未恢复则放入长时间离线设备集合中并发送提醒,上线后发送提醒 redisTemplate.opsForZSet().add(RedisKeys.OFFLINE, connectorStatusInfo.getConnectorId(), System.currentTimeMillis() + 60 * 60 * 1000); } else { // 先删除离线设备队列的记录,再删除离线超时队列中的记录 var isDelete = redisTemplate.opsForZSet().remove(RedisKeys.OFFLINE, connectorStatusInfo.getConnectorId()); var exist = redisTemplate.opsForSet().remove(RedisKeys.OFFLINE_EXPIRED, connectorStatusInfo.getConnectorId()); if ((isDelete != null && isDelete > 0) || (exist != null && exist > 0)) { // 更新设备监控表 monitorLogService.lambdaUpdate() .eq(MonitorLog::getSn, connectorStatusInfo.getConnectorId()) .eq(MonitorLog::getIsRecover, MonitorLog.IS_RECOVER_未恢复) // 未恢复的记录 .set(MonitorLog::getRecoverTime, LocalDateTime.now()) .set(MonitorLog::getIsRecover, MonitorLog.IS_RECOVER_已恢复) // 设置为已恢复 .update(); } if (exist != null && exist > 0) { MailUtil.send(notifyEmail, "【设备上线通知】", "站点:%s,设备%s恢复上线" .formatted(KymCache.INSTANCE.getStationNameByConnectorId(connectorStatusInfo.getConnectorId()), KymCache.INSTANCE.getShortIdByEquipmentIdOrConnectorId(connectorStatusInfo.getConnectorId())), false); } } // 设备状态变为空闲,校验是否有预约订单,有则清除预约订单数据。 if (connectorStatus == EquipmentInfo.SERVICE_STATUS_空闲 && KymCache.INSTANCE.getConnectorStatus(connectorId) != EquipmentInfo.SERVICE_STATUS_空闲) { LOGGER.info("设备:{}状态转为空闲,清除设备预约数据...", connectorId); chargeService.cancelBookingByConnector(connectorId); } KymCache.INSTANCE.putConnectorId2Status(Map.of(connectorId, connectorStatus)); return """ { "Status":%d } """.formatted(0); } /** * EN+ 推送启动充电结果 * * @param json * @return */ @Override @Transactional(rollbackFor = Exception.class) public String handleNotificationStartChargeResult(JSONObject json) { var data = enPlusService.signValidation(json); LOGGER.info("【EN+推送】收到启动充电结果推送:{},解密数据:{}", json, data); var obj = JSONObject.parseObject(data); var startChargeSeq = obj.getString("StartChargeSeq"); var startChargeSeqStat = obj.getIntValue("StartChargeSeqStat"); var connectorId = obj.getString("ConnectorID"); var startTime = LocalDateTime.parse(obj.getString("StartTime"), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 更新订单状态 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("connector_id", connectorId); updateWrapper.eq("start_charge_seq", startChargeSeq); updateWrapper.set("order_status", ChargeOrder.ORDER_STATUS_未知); updateWrapper.set("charge_status", startChargeSeqStat); updateWrapper.set("start_time", startTime); chargeOrderService.update(updateWrapper); return """ { "StartChargeSeq":"%s", "SuccStat":%d, "FailReason":%d } """.formatted(startChargeSeq, 0, 0); } /** * 推送充电状态,1分钟推送一次 * * @param json * @return * @see com.kym.miniapp.jobs.EquipmentChargeStatusJob#executeMpUserRelationJob() */ @Override @Transactional(rollbackFor = Exception.class) public String handleNotificationEquipChargeStatus(JSONObject json) { var dataStr = enPlusService.signValidation(json); var data = JSONObject.parseObject(dataStr); LOGGER.info("【EN+推送】 :{},解密数据:{}", json, data); var startChargeSeq = data.getString("StartChargeSeq"); var chargeOrder = chargeOrderService.getChargingOrderByStartChargeSeq(startChargeSeq); // 更新订单信息 chargeOrder.setSoc(data.getDoubleValue("Soc")); chargeOrder.setTotalPower(data.getDoubleValue("TotalPower")); chargeOrder.setTotalMoney((data.getBigDecimal("TotalMoney").multiply(BigDecimal.valueOf(100))).intValue()); chargeOrder.setElecMoney((data.getBigDecimal("ElecMoney").multiply(BigDecimal.valueOf(100))).intValue()); chargeOrder.setServiceMoney((data.getBigDecimal("SeviceMoney").multiply(BigDecimal.valueOf(100))).intValue()); // 这里文档service单词错误,按文档填写 chargeOrder.setSumPeriod(data.getIntValue("SumPeriod")); chargeOrder.setChargeDetail(data.getString("ChargeDetails")); chargeOrder.setChargeStatus(data.getIntValue("StartChargeSeqStat")); // 优化点 EN+一分钟推送一次,同时充电人数多的时候写入数据库过于频繁 // redis保存(更新)订单信息 redisTemplate.opsForHash().put(RedisKeys.CHARGE_ORDER_EQUIP_CHARGE_STATUS, startChargeSeq, JSONObject.toJSONString(chargeOrder)); // 将数据库写入操作放到定时任务中 // chargeOrderService.updateById(chargeOrder); return """ { "StartChargeSeq":"%s", "SuccStat":%d } """.formatted(startChargeSeq, 0); } /** * 推送停止充电结果 * * @param json * @return */ @Override @Transactional(rollbackFor = Exception.class) public String handleNotificationStopChargeResult(JSONObject json) { var dataStr = enPlusService.signValidation(json); var data = JSONObject.parseObject(dataStr); LOGGER.info("【EN+推送】收到停止充电结果推送:{},解密数据:{}", json, data); var startChargeSeq = data.getString("StartChargeSeq"); var chargeOrder = chargeOrderService.getChargingOrderByStartChargeSeq(startChargeSeq); if (data.containsKey("SuccStat") && data.getIntValue("SuccStat") == 0) { chargeOrder.setChargeStatus(data.getIntValue("StartChargeSeqStat")); chargeOrderService.updateById(chargeOrder); } return """ { "StartChargeSeq":"%s", "SuccStat":%d, "FailReason":%d } """.formatted(startChargeSeq, 0, 0); } /** * 推送充电订单信息(订单结算) * * @param json * @return */ @Override @Transactional(rollbackFor = Exception.class) public String handleNotificationChargeOrderInfo(JSONObject json) { var dataStr = enPlusService.signValidation(json); var data = JSONObject.parseObject(dataStr); LOGGER.info("【EN+推送】收到充电订单信息推送:{},解密数据:{}", json, data); var startChargeSeq = data.getString("StartChargeSeq"); var chargeOrder = chargeOrderService.getChargingOrderByStartChargeSeq(startChargeSeq); // 账户 var account = accountService.getAccountByUserId(chargeOrder.getUserId()); // EN+平台推送重试策略是当天失败第二天再推送一次,仅此一次。EN+订单页面可以多次手动推送,所以这里要先判断订单状态,避免重复处理。 if (chargeOrder.getChargeStatus() != ChargeOrder.CHARGE_STATUS_已结束 || chargeOrder.getOrderStatus() != ChargeOrder.ORDER_STATUS_成功) { // 更新订单信息 chargeOrder .setStartTime(LocalDateTime.parse(data.getString("StartTime"), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))) .setEndTime(LocalDateTime.parse(data.getString("EndTime"), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))) .setTotalPower(data.getDoubleValue("TotalPower")) .setElecMoney((int) Math.round(data.getDouble("TotalElecMoney") * 100)) .setServiceMoney((int) Math.round(data.getDoubleValue("TotalSeviceMoney") * 100)) // 这里文档service单词错误,按文档填写 .setTotalMoney((int) Math.round(data.getDoubleValue("TotalMoney") * 100)) .setStopReason(data.getIntValue("StopReason")) .setSumPeriod(data.getIntValue("SumPeriod")) .setChargeDetail(data.getString("ChargeDetails")) // 实付金额初始化为订单总金额 .setPayAmount(chargeOrder.getTotalMoney()); // 结束时间 var endTime = LocalDateTime.parse(data.getString("EndTime"), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 处理充值权益优惠逻辑/优惠券优惠逻辑 // 优惠金额 DiscountStrategyFactory.getDiscountStrategy(chargeOrder.getDiscountType()).computeDiscount(chargeOrder, account); // 更新订单优惠金额 chargeOrderService.updateById(chargeOrder); // 扣费等资金操作 deductions(chargeOrder, account, endTime); // redis删除缓存订单信息(订单结算完调用) redisTemplate.opsForHash().delete(RedisKeys.CHARGE_ORDER_EQUIP_CHARGE_STATUS, startChargeSeq); // 更新用户站点数据 userStationService.updateUserStation(chargeOrder); } return """ { "StartChargeSeq":"%s", "ConnectorID":"%s", "ConfirmResult":%d } """.formatted(startChargeSeq, chargeOrder.getConnectorId(), 0); } /** * 扣费等资金操作 * * @param chargeOrder * @param account * @param endTime */ private void deductions(ChargeOrder chargeOrder, Account account, LocalDateTime endTime) { // 订单成功 chargeOrder.setOrderStatus(ChargeOrder.ORDER_STATUS_成功); // 充电结束 chargeOrder.setChargeStatus(ChargeOrder.CHARGE_STATUS_已结束); chargeOrderService.updateById(chargeOrder); // 账户扣费 var beforeBalance = account.getBalance(); account.setBalance(beforeBalance - chargeOrder.getPayAmount()); accountService.updateById(account); // 记录资金流水 var walletDetail = new WalletDetail(); walletDetail.setUserId(chargeOrder.getUserId()); walletDetail.setOrderNo(chargeOrder.getStartChargeSeq()); // 消费 walletDetail.setType(WalletDetail.TYPE_消费); walletDetail.setAmount(chargeOrder.getPayAmount()); walletDetail.setBeforeBalance(beforeBalance); walletDetail.setAfterBalance(account.getBalance()); walletDetail.setTransactionTime(endTime); // 已确认 walletDetail.setStatus(WalletDetail.STATUS_已确认); walletDetailService.save(walletDetail); } }