|
|
@@ -0,0 +1,251 @@
|
|
|
+package com.haha.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
|
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.haha.entity.Device;
|
|
|
+import com.haha.entity.SyncLog;
|
|
|
+import com.haha.entity.SyncRecord;
|
|
|
+import com.haha.mapper.SyncLogMapper;
|
|
|
+import com.haha.mapper.SyncRecordMapper;
|
|
|
+import com.haha.sdk.HahaClient;
|
|
|
+import com.haha.sdk.exception.HahaException;
|
|
|
+import com.haha.sdk.model.DeviceInfo;
|
|
|
+import com.haha.service.DataSyncService;
|
|
|
+import com.haha.service.DeviceService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 数据同步服务实现
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class DataSyncServiceImpl extends ServiceImpl<SyncRecordMapper, SyncRecord> implements DataSyncService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private HahaClient hahaClient;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DeviceService deviceService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SyncLogMapper syncLogMapper;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步状态常量
|
|
|
+ */
|
|
|
+ private static final int STATUS_PENDING = 0; // 待同步
|
|
|
+ private static final int STATUS_SYNCING = 1; // 同步中
|
|
|
+ private static final int STATUS_SUCCESS = 2; // 同步成功
|
|
|
+ private static final int STATUS_FAILED = 3; // 同步失败
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步类型
|
|
|
+ */
|
|
|
+ private static final String SYNC_TYPE_DEVICE = "DEVICE";
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public SyncRecord syncDevices(Long operatorId, String operatorName) {
|
|
|
+ log.info("开始同步设备数据,操作人: {}", operatorName);
|
|
|
+
|
|
|
+ // 1. 创建同步记录
|
|
|
+ SyncRecord record = new SyncRecord();
|
|
|
+ record.setSyncType(SYNC_TYPE_DEVICE);
|
|
|
+ record.setStatus(STATUS_SYNCING);
|
|
|
+ record.setStartTime(LocalDateTime.now());
|
|
|
+ record.setOperatorId(operatorId);
|
|
|
+ record.setOperatorName(operatorName);
|
|
|
+ record.setCreateTime(LocalDateTime.now());
|
|
|
+ record.setTotalCount(0);
|
|
|
+ record.setSuccessCount(0);
|
|
|
+ record.setFailCount(0);
|
|
|
+ save(record);
|
|
|
+
|
|
|
+ int successCount = 0;
|
|
|
+ int failCount = 0;
|
|
|
+ StringBuilder errorMsg = new StringBuilder();
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 2. 从哈哈平台获取设备列表
|
|
|
+ List<DeviceInfo> deviceList = hahaClient.getDeviceApi().getDeviceList(1, 100);
|
|
|
+ record.setTotalCount(deviceList.size());
|
|
|
+ log.info("从哈哈平台获取到 {} 台设备", deviceList.size());
|
|
|
+
|
|
|
+ // 3. 遍历设备列表,同步到本地数据库
|
|
|
+ for (DeviceInfo deviceInfo : deviceList) {
|
|
|
+ try {
|
|
|
+ syncSingleDevice(record.getId(), deviceInfo);
|
|
|
+ successCount++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ failCount++;
|
|
|
+ String errMsg = String.format("设备 %s 同步失败: %s", deviceInfo.getId(), e.getMessage());
|
|
|
+ errorMsg.append(errMsg).append("; ");
|
|
|
+ log.error(errMsg, e);
|
|
|
+
|
|
|
+ // 记录失败日志
|
|
|
+ saveSyncLog(record.getId(), SYNC_TYPE_DEVICE, deviceInfo.getId(),
|
|
|
+ deviceInfo.getName(), 0, errMsg, JSON.toJSONString(deviceInfo));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4. 更新同步记录
|
|
|
+ record.setSuccessCount(successCount);
|
|
|
+ record.setFailCount(failCount);
|
|
|
+ record.setStatus(failCount == 0 ? STATUS_SUCCESS : (successCount > 0 ? STATUS_SUCCESS : STATUS_FAILED));
|
|
|
+ record.setEndTime(LocalDateTime.now());
|
|
|
+ record.setDuration(java.time.Duration.between(record.getStartTime(), record.getEndTime()).toMillis());
|
|
|
+ if (errorMsg.length() > 0) {
|
|
|
+ record.setErrorMessage(errorMsg.toString());
|
|
|
+ }
|
|
|
+ updateById(record);
|
|
|
+
|
|
|
+ log.info("设备数据同步完成,成功: {}, 失败: {}", successCount, failCount);
|
|
|
+
|
|
|
+ } catch (HahaException e) {
|
|
|
+ log.error("调用哈哈平台API失败", e);
|
|
|
+ record.setStatus(STATUS_FAILED);
|
|
|
+ record.setEndTime(LocalDateTime.now());
|
|
|
+ record.setDuration(java.time.Duration.between(record.getStartTime(), record.getEndTime()).toMillis());
|
|
|
+ record.setErrorMessage("调用哈哈平台API失败: " + e.getMessage());
|
|
|
+ updateById(record);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("设备数据同步异常", e);
|
|
|
+ record.setStatus(STATUS_FAILED);
|
|
|
+ record.setEndTime(LocalDateTime.now());
|
|
|
+ record.setDuration(java.time.Duration.between(record.getStartTime(), record.getEndTime()).toMillis());
|
|
|
+ record.setErrorMessage("同步异常: " + e.getMessage());
|
|
|
+ updateById(record);
|
|
|
+ }
|
|
|
+
|
|
|
+ return record;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步单个设备
|
|
|
+ */
|
|
|
+ private void syncSingleDevice(Long recordId, DeviceInfo deviceInfo) {
|
|
|
+ log.debug("同步设备: {} - {}", deviceInfo.getId(), deviceInfo.getName());
|
|
|
+
|
|
|
+ // 查询本地是否已存在该设备
|
|
|
+ Device existDevice = deviceService.getDeviceBySn(deviceInfo.getId());
|
|
|
+
|
|
|
+ if (existDevice != null) {
|
|
|
+ // 更新设备信息
|
|
|
+ existDevice.setName(deviceInfo.getName());
|
|
|
+ existDevice.setStatus("1".equals(deviceInfo.getStatus()) ? 1 : 0);
|
|
|
+ // existDevice.setAddress(deviceInfo.getAddress()); // Device实体暂无address字段
|
|
|
+ deviceService.updateById(existDevice);
|
|
|
+
|
|
|
+ // 记录成功日志
|
|
|
+ saveSyncLog(recordId, SYNC_TYPE_DEVICE, deviceInfo.getId(),
|
|
|
+ deviceInfo.getName(), 1, "设备信息更新成功", null);
|
|
|
+ } else {
|
|
|
+ // 新增设备
|
|
|
+ Device newDevice = new Device();
|
|
|
+ newDevice.setDeviceId(deviceInfo.getId());
|
|
|
+ newDevice.setName(deviceInfo.getName());
|
|
|
+ // newDevice.setAddress(deviceInfo.getAddress()); // Device实体暂无address字段
|
|
|
+ newDevice.setStatus("1".equals(deviceInfo.getStatus()) ? 1 : 0);
|
|
|
+ deviceService.save(newDevice);
|
|
|
+
|
|
|
+ // 记录成功日志
|
|
|
+ saveSyncLog(recordId, SYNC_TYPE_DEVICE, deviceInfo.getId(),
|
|
|
+ deviceInfo.getName(), 1, "设备新增成功", null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 保存同步日志
|
|
|
+ */
|
|
|
+ private void saveSyncLog(Long recordId, String syncType, String dataId,
|
|
|
+ String dataName, Integer status, String message, String detail) {
|
|
|
+ SyncLog syncLog = new SyncLog();
|
|
|
+ syncLog.setRecordId(recordId);
|
|
|
+ syncLog.setSyncType(syncType);
|
|
|
+ syncLog.setDataId(dataId);
|
|
|
+ syncLog.setDataName(dataName);
|
|
|
+ syncLog.setStatus(status);
|
|
|
+ syncLog.setMessage(message);
|
|
|
+ syncLog.setDetail(detail);
|
|
|
+ syncLog.setCreateTime(LocalDateTime.now());
|
|
|
+ syncLogMapper.insert(syncLog);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Map<String, Object> getSyncRecordList(String syncType, Integer status, Integer page, Integer pageSize) {
|
|
|
+ LambdaQueryWrapper<SyncRecord> wrapper = new LambdaQueryWrapper<>();
|
|
|
+ if (syncType != null && !syncType.isEmpty()) {
|
|
|
+ wrapper.eq(SyncRecord::getSyncType, syncType);
|
|
|
+ }
|
|
|
+ if (status != null) {
|
|
|
+ wrapper.eq(SyncRecord::getStatus, status);
|
|
|
+ }
|
|
|
+ wrapper.orderByDesc(SyncRecord::getCreateTime);
|
|
|
+
|
|
|
+ IPage<SyncRecord> pageResult = page(new Page<>(page, pageSize), wrapper);
|
|
|
+
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ result.put("list", pageResult.getRecords());
|
|
|
+ result.put("total", pageResult.getTotal());
|
|
|
+ result.put("page", page);
|
|
|
+ result.put("pageSize", pageSize);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Map<String, Object> getSyncLogList(Long recordId, Integer page, Integer pageSize) {
|
|
|
+ LambdaQueryWrapper<SyncLog> wrapper = new LambdaQueryWrapper<>();
|
|
|
+ wrapper.eq(SyncLog::getRecordId, recordId);
|
|
|
+ wrapper.orderByDesc(SyncLog::getCreateTime);
|
|
|
+
|
|
|
+ IPage<SyncLog> pageResult = syncLogMapper.selectPage(new Page<>(page, pageSize), wrapper);
|
|
|
+
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ result.put("list", pageResult.getRecords());
|
|
|
+ result.put("total", pageResult.getTotal());
|
|
|
+ result.put("page", page);
|
|
|
+ result.put("pageSize", pageSize);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Map<String, Object> getSyncStatistics() {
|
|
|
+ Map<String, Object> stats = new HashMap<>();
|
|
|
+
|
|
|
+ // 总同步次数
|
|
|
+ long totalRecords = count();
|
|
|
+ stats.put("totalRecords", totalRecords);
|
|
|
+
|
|
|
+ // 成功次数
|
|
|
+ long successRecords = lambdaQuery().eq(SyncRecord::getStatus, STATUS_SUCCESS).count();
|
|
|
+ stats.put("successRecords", successRecords);
|
|
|
+
|
|
|
+ // 失败次数
|
|
|
+ long failedRecords = lambdaQuery().eq(SyncRecord::getStatus, STATUS_FAILED).count();
|
|
|
+ stats.put("failedRecords", failedRecords);
|
|
|
+
|
|
|
+ // 设备同步统计
|
|
|
+ long deviceSyncCount = lambdaQuery().eq(SyncRecord::getSyncType, SYNC_TYPE_DEVICE).count();
|
|
|
+ stats.put("deviceSyncCount", deviceSyncCount);
|
|
|
+
|
|
|
+ // 最近一次同步记录
|
|
|
+ SyncRecord latestRecord = lambdaQuery()
|
|
|
+ .orderByDesc(SyncRecord::getCreateTime)
|
|
|
+ .last("LIMIT 1")
|
|
|
+ .one();
|
|
|
+ stats.put("latestRecord", latestRecord);
|
|
|
+
|
|
|
+ return stats;
|
|
|
+ }
|
|
|
+}
|