|
|
@@ -0,0 +1,148 @@
|
|
|
+package com.kym.admin.jobs;
|
|
|
+
|
|
|
+import com.baomidou.dynamic.datasource.annotation.DS;
|
|
|
+import com.kym.common.utils.CommUtil;
|
|
|
+import com.kym.entity.admin.Activity;
|
|
|
+import com.kym.entity.admin.Banner;
|
|
|
+import com.kym.entity.admin.delay.DelayActivity;
|
|
|
+import com.kym.service.admin.ActivityService;
|
|
|
+import com.kym.service.admin.BannerService;
|
|
|
+import com.kym.service.jobs.DelayService;
|
|
|
+import com.kym.service.jobs.DelayedItem;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|
|
+import org.springframework.context.annotation.Scope;
|
|
|
+import org.springframework.context.event.ContextRefreshedEvent;
|
|
|
+import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.concurrent.DelayQueue;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author skyline
|
|
|
+ * @description 活动启闭延迟任务
|
|
|
+ * @date 2023-10-08 22:11
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+@DS("db-admin")
|
|
|
+@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) // 设置成单例
|
|
|
+public class ActivityDelayJob implements DelayService<DelayActivity> {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 预约订单队列
|
|
|
+ */
|
|
|
+ private final static DelayQueue<DelayedItem<DelayActivity>> DELAY_QUEUE = new DelayQueue<>();
|
|
|
+
|
|
|
+ private final ActivityService activityService;
|
|
|
+ private final BannerService bannerService;
|
|
|
+ /**
|
|
|
+ * 线程池
|
|
|
+ */
|
|
|
+ private final ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
|
+
|
|
|
+ public ActivityDelayJob(ActivityService activityService, BannerService bannerService) {
|
|
|
+ this.activityService = activityService;
|
|
|
+ this.bannerService = bannerService;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void initActivityDelayJob(Activity activity) {
|
|
|
+ // 将启动时间和结束时间组装延迟任务放入队列
|
|
|
+ var startJob = new DelayActivity(activity.getId(), activity.getName(), activity.getStartTime(), DelayActivity.TYPE_启动);
|
|
|
+ var endJob = new DelayActivity(activity.getId(), activity.getName(), activity.getEndTime(), DelayActivity.TYPE_结束);
|
|
|
+ addToDelayQueue(startJob);
|
|
|
+ addToDelayQueue(endJob);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @DS("db-admin")
|
|
|
+ // 这里不能使用@PostConstruct,在初始化完成后, bean 进入增强阶段, 所以这个阶段的任何AOP都是无效的,https://www.cnblogs.com/eternityz/p/15330069.html
|
|
|
+ @EventListener
|
|
|
+ public void init(ContextRefreshedEvent event) {
|
|
|
+ // 队列加载所有进行中和未开始的活动
|
|
|
+ var activity = activityService.lambdaQuery().in(Activity::getStatus, Activity.STATUS_进行中, Activity.STATUS_未开始).list();
|
|
|
+
|
|
|
+ // 启动队列
|
|
|
+ var delayStartActivityList = activity.stream().filter(a -> a.getStatus().equals(Activity.STATUS_未开始)).map(act -> new DelayActivity()
|
|
|
+ .setId(act.getId())
|
|
|
+ .setName(act.getName())
|
|
|
+ .setExecuteTime(act.getStartTime())
|
|
|
+ .setType(DelayActivity.TYPE_启动))
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ // 结束队列
|
|
|
+ var delayEndActivityList = activity.stream().map(act -> new DelayActivity()
|
|
|
+ .setId(act.getId())
|
|
|
+ .setName(act.getName())
|
|
|
+ .setExecuteTime(act.getEndTime())
|
|
|
+ .setType(DelayActivity.TYPE_结束))
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ var delayStartList = delayStartActivityList.stream().map(delay -> new DelayedItem<>(delay, delay.getExecuteTime())).toList();
|
|
|
+ var delayEndList = delayEndActivityList.stream().map(delay -> new DelayedItem<>(delay, delay.getExecuteTime())).toList();
|
|
|
+
|
|
|
+ DELAY_QUEUE.addAll(delayStartList);
|
|
|
+ DELAY_QUEUE.addAll(delayEndList);
|
|
|
+
|
|
|
+ // 开启线程处理队列消息
|
|
|
+ executor.execute(() -> {
|
|
|
+ ThreadLocal<Long> threadLocal = new ThreadLocal<>();
|
|
|
+ log.info("活动延迟启闭处理线程:{}", Thread.currentThread().getName());
|
|
|
+ DelayedItem<DelayActivity> delayedItem;
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ delayedItem = DELAY_QUEUE.take();
|
|
|
+ var delayActivity = delayedItem.data;
|
|
|
+ threadLocal.set(delayActivity.getId());
|
|
|
+ if (delayActivity.getType().equals(DelayActivity.TYPE_启动)) {
|
|
|
+ // 开始活动
|
|
|
+ // 修改活动状态为已结束
|
|
|
+ activityService.lambdaUpdate().set(Activity::getStatus, Activity.STATUS_进行中).eq(Activity::getId, delayActivity.getId()).update();
|
|
|
+ // 修改banner状态为失效
|
|
|
+ bannerService.lambdaUpdate().set(Banner::getStatus, Banner.STATUS_有效).eq(Banner::getActivityId, delayActivity.getId()).update();
|
|
|
+ } else {
|
|
|
+ // 停止活动
|
|
|
+ // 修改活动状态为已结束
|
|
|
+ activityService.lambdaUpdate().set(Activity::getStatus, Activity.STATUS_已结束).eq(Activity::getId, delayActivity.getId()).update();
|
|
|
+ // 修改banner状态为失效
|
|
|
+ bannerService.lambdaUpdate().set(Banner::getStatus, Banner.STATUS_无效).eq(Banner::getActivityId, delayActivity.getId()).update();
|
|
|
+ }
|
|
|
+ // 线程休眠100ms
|
|
|
+ Thread.sleep(100);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e instanceof InterruptedException) {
|
|
|
+ log.error("活动到期停止队列take异常", e);
|
|
|
+ } else {
|
|
|
+ log.info("活动到期停止,主活动id:{}", threadLocal.get(), e);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ threadLocal.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean addToOrderDelayQueue(DelayedItem<DelayActivity> delayedItem) {
|
|
|
+ return DELAY_QUEUE.add(delayedItem);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean addToDelayQueue(DelayActivity activity) {
|
|
|
+ DelayedItem<DelayActivity> orderDelayed = new DelayedItem<>(activity, activity.getExecuteTime());
|
|
|
+ return DELAY_QUEUE.add(orderDelayed);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean removeFromOrderDelayQueue(Object activityId) {
|
|
|
+ if (CommUtil.isEmptyOrNull(activityId)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return DELAY_QUEUE.removeIf(queue -> queue.data.getId().equals(activityId));
|
|
|
+ }
|
|
|
+}
|