skyline %!s(int64=2) %!d(string=hai) anos
pai
achega
282ee4a895

+ 0 - 26
admin/src/main/java/com/kym/admin/controller/TestController.java

@@ -1,26 +0,0 @@
-package com.kym.admin.controller;
-
-import com.kym.common.R;
-import com.kym.service.queue.sender.SendMessage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.ResponseBody;
-
-@Controller
-@RequestMapping
-public class TestController {
-
-    @Autowired
-    private SendMessage sendMessage;
-
-    @RequestMapping(value = "/hello", method = RequestMethod.GET)
-    @ResponseBody
-    public R<?> test() {
-
-            sendMessage.sendMessage("", "confirm_test_queue", "发送者消息");
-
-        return R.success();
-    }
-}

+ 5 - 0
entity/src/main/java/com/kym/entity/common/Exchanges.java

@@ -0,0 +1,5 @@
+package com.kym.entity.common;
+
+public class Exchanges {
+    public static final String DELAY_EXCHANGE_NAME = "delay-exchange";
+}

+ 1 - 14
entity/src/main/java/com/kym/entity/common/Queues.java

@@ -1,18 +1,5 @@
 package com.kym.entity.common;
 
-/**
- * 消息队列
- *
- * @author skyline
- */
 public class Queues {
-
-    /**
-     * 优惠券队列
-     */
-    public static final String COUPON_QUEUE = "coupon.queue";
-    /**
-     * 优惠券死信队列
-     */
-    public static final String COUPON_DELAY_QUEUE = "coupon.delay.queue";
+    public static final String DELAY_QUEUE = "delay.user.coupon.queue";
 }

+ 5 - 0
entity/src/main/java/com/kym/entity/common/RoutingKeys.java

@@ -0,0 +1,5 @@
+package com.kym.entity.common;
+
+public class RoutingKeys {
+    public static final String DELAY_ROUTING_KEY = "delay.routingKey";
+}

+ 8 - 2
service/src/main/java/com/kym/service/admin/impl/ActivityServiceImpl.java

@@ -21,6 +21,8 @@ import com.kym.service.jobs.DelayService;
 import com.kym.service.miniapp.UserCouponService;
 import com.kym.service.miniapp.UserRechargeRightsService;
 import com.kym.service.miniapp.UserService;
+import com.kym.service.queue.sender.MessageSender;
+import com.kym.service.queue.sender.UserCouponSender;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.context.annotation.Lazy;
@@ -64,9 +66,11 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
 
     private final UserCouponService userCouponService;
 
+    private final UserCouponSender userCouponSender;
+
     public ActivityServiceImpl(ActivityStationService activityStationService, RechargeRightsService rechargeRightsService, CouponService couponService,
                                UserRechargeRightsService userRechargeRightsService, StationService stationService,
-                               @Lazy DelayService<DelayActivity> activityDelayService, BannerService bannerService, UserService userService, UserCouponService userCouponService) {
+                               @Lazy DelayService<DelayActivity> activityDelayService, BannerService bannerService, UserService userService, UserCouponService userCouponService,  UserCouponSender userCouponSender) {
         this.activityStationService = activityStationService;
         this.rechargeRightsService = rechargeRightsService;
         this.couponService = couponService;
@@ -76,6 +80,7 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
         this.bannerService = bannerService;
         this.userService = userService;
         this.userCouponService = userCouponService;
+        this.userCouponSender = userCouponSender;
     }
 
     /**
@@ -146,7 +151,8 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
                 break;
         }
         userCouponService.saveBatch(userCouponList);
-//        userCouponList.forEach(couponMessageProducer::sendMessage);
+        // 发送消息至队列
+        userCouponList.forEach(userCouponSender::sendMessage);
     }
 
     /**

+ 37 - 12
service/src/main/java/com/kym/service/queue/config/QueueConfig.java

@@ -1,33 +1,58 @@
 package com.kym.service.queue.config;
 
+import com.kym.entity.common.Exchanges;
+import com.kym.entity.common.Queues;
+import com.kym.entity.common.RoutingKeys;
 import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.FanoutExchange;
+import org.springframework.amqp.core.CustomExchange;
 import org.springframework.amqp.core.Queue;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 @Configuration
 public class QueueConfig {
 
-    @Bean(name = "confirmTestQueue")
-    public Queue confirmTestQueue() {
-        return new Queue("confirm_test_queue", true, false, false);
+    /**
+     * 延迟交换机的类型
+     */
+    public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message";
+
+
+    /**
+     * 延迟队列
+     *
+     * @return
+     */
+    @Bean(name = "delayQueue")
+    public Queue delayQueue() {
+        return new Queue(Queues.DELAY_QUEUE, true, false, false);
     }
 
-    @Bean(name = "confirmTestExchange")
-    public FanoutExchange confirmTestExchange() {
-        return new FanoutExchange("confirmTestExchange");
+
+    /**
+     * 创建延迟交换机,必须先创建才能监听
+     *
+     * @return
+     */
+    @Bean
+    public CustomExchange delayExchange() {
+        Map<String, Object> args = new HashMap<>();
+        args.put("x-delayed-type", "direct");
+        //属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数
+        return new CustomExchange(Exchanges.DELAY_EXCHANGE_NAME, DELAY_EXCHANGE_TYPE, true, false, args);
     }
 
     @Bean
-    public Binding confirmTestFanoutExchangeAndQueue(
-            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
-            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
-        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
+    public Binding binding() {
+        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RoutingKeys.DELAY_ROUTING_KEY).noargs();
     }
+
+
 }
 
 

+ 15 - 15
service/src/main/java/com/kym/service/queue/receiver/ReceiverMessage.java

@@ -1,5 +1,7 @@
 package com.kym.service.queue.receiver;
 
+import com.kym.entity.common.Queues;
+import com.kym.service.miniapp.UserCouponService;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
@@ -9,37 +11,35 @@ import org.springframework.stereotype.Component;
 
 import java.io.IOException;
 
-/**
- * @author 公众号:程序员小富
- * @description 消息消费
- * @date 2020/6/29 16:31
- */
+
 @Slf4j
 @Component
-@RabbitListener(queues = "confirm_test_queue")
 public class ReceiverMessage {
 
+    private final UserCouponService userCouponService;
+
+    public ReceiverMessage(UserCouponService userCouponService) {
+        this.userCouponService = userCouponService;
+    }
+
     @RabbitHandler
-    public void processHandler1(String msg, Channel channel, Message message) throws IOException {
+    @RabbitListener(queues = Queues.DELAY_QUEUE)
+    public void userCouponHandler(String msg, Channel channel, Message message) throws IOException {
 
         try {
-            log.info("消费者 1 号收到:{}", msg);
+            log.info("userCouponHandler收到:{}", msg);
 
             //TODO 具体业务
 
             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 
-        }  catch (Exception e) {
-
+        } catch (Exception e) {
             if (message.getMessageProperties().getRedelivered()) {
-
                 log.error("消息已重复处理失败,拒绝再次接收...");
-
-                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
+                // 拒绝消息
+                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
             } else {
-
                 log.error("消息即将再次返回队列处理...");
-
                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
             }
         }

+ 0 - 63
service/src/main/java/com/kym/service/queue/receiver/ReceiverMessage1.java

@@ -1,63 +0,0 @@
-package com.kym.service.queue.receiver;
-
-import com.rabbitmq.client.Channel;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.rabbit.annotation.RabbitHandler;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-
-/**
- * @Author: 公众号:程序员小富
- * @Description:
- */
-@Slf4j
-@Component
-@RabbitListener(queues = "confirm_test_queue")
-public class ReceiverMessage1 {
-
-    private int retryNum = 5;
-
-    private int currentNum = 0;
-
-
-//    @RabbitHandler
-//    public void processHandler(String msg, Channel channel, Message message) throws IOException {
-//
-//        try {
-//            log.info("消费者 2 号收到:{}", msg);
-//
-//            int a = 1 / 0;
-//
-//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-//
-//        } catch (Exception e) {
-//
-//            if (currentNum <= 1000) {
-//
-//                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
-//            }
-//            currentNum++;
-//        }
-//    }
-
-    @RabbitHandler
-    public void processHandler(CorrelationData correlationData , String msg, Channel channel, Message message) throws IOException {
-
-        try {
-            log.info("消费者 2 号收到:{}", msg);
-
-            String correlationId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
-
-            System.out.println(correlationId);
-
-            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
-        } catch (Exception e) {
-
-        }
-    }
-}

+ 20 - 19
service/src/main/java/com/kym/service/queue/sender/SendMessage.java → service/src/main/java/com/kym/service/queue/sender/MessageSender.java

@@ -3,40 +3,40 @@ package com.kym.service.queue.sender;
 import com.kym.service.queue.callback.ConfirmCallbackService;
 import com.kym.service.queue.callback.ReturnCallbackService;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.MessageDeliveryMode;
 import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.UUID;
 
 /**
- * @Author: 公众号:程序员小富
- * @Description:
+ * rabbitmq 消息发送
  */
 @Slf4j
 @Component
-public class SendMessage {
+public class MessageSender {
 
-    @Autowired
-    private RabbitTemplate rabbitTemplate;
 
-    @Autowired
-    private ConfirmCallbackService confirmCallbackService;
+    private final RabbitTemplate rabbitTemplate;
 
-    @Autowired
-    private ReturnCallbackService returnCallbackService;
+    private final ConfirmCallbackService confirmCallbackService;
+
+    private final ReturnCallbackService returnCallbackService;
+
+    public MessageSender(RabbitTemplate rabbitTemplate, ConfirmCallbackService confirmCallbackService, ReturnCallbackService returnCallbackService) {
+        this.rabbitTemplate = rabbitTemplate;
+        this.confirmCallbackService = confirmCallbackService;
+        this.returnCallbackService = returnCallbackService;
+    }
 
     /**
-     * @param exchange   交换机
-     * @param routingKey 队列
-     * @param msg        消息体
-     * @author 公众号:程序员小富
-     * @description 发送消息
-     * @date 2020/6/29 16:22
+     *
+     * @param exchange
+     * @param routingKey
+     * @param msg
+     * @param delayTime 延迟时间(ms)
      */
-    public void sendMessage(String exchange, String routingKey, Object msg) {
+    public void send(String exchange, String routingKey, Object msg, int... delayTime) {
 
         /**
          * 确保消息发送失败后可以重新返回到队列中
@@ -59,7 +59,8 @@ public class SendMessage {
          */
         rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                 message -> {
-                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+//                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+                    message.getMessageProperties().setDelay(delayTime.length > 0 ? delayTime[0] : 0);
                     return message;
                 },
                 new CorrelationData(UUID.randomUUID().toString()));

+ 19 - 0
service/src/main/java/com/kym/service/queue/sender/UserCouponSender.java

@@ -0,0 +1,19 @@
+package com.kym.service.queue.sender;
+
+import com.kym.entity.common.Exchanges;
+import com.kym.entity.common.RoutingKeys;
+import org.springframework.stereotype.Component;
+
+@Component
+public class UserCouponSender {
+
+    private final MessageSender messageSender;
+
+    public UserCouponSender(MessageSender messageSender) {
+        this.messageSender = messageSender;
+    }
+
+    public void sendMessage(Object msg, int... delayTime) {
+        messageSender.send(Exchanges.DELAY_EXCHANGE_NAME, RoutingKeys.DELAY_ROUTING_KEY, msg, delayTime);
+    }
+}