Kaynağa Gözat

队列修改

skyline 2 yıl önce
ebeveyn
işleme
7f47522778

+ 26 - 0
admin/src/main/java/com/kym/admin/controller/TestController.java

@@ -0,0 +1,26 @@
+package com.kym.admin.controller;
+
+import com.kym.common.R;
+import com.kym.service.queue.sender.SendMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping
+public class TestController {
+
+    @Autowired
+    private SendMessage sendMessage;
+
+    @RequestMapping(value = "/hello", method = RequestMethod.GET)
+    @ResponseBody
+    public R<?> test() {
+
+            sendMessage.sendMessage("", "confirm_test_queue", "发送者消息");
+
+        return R.success();
+    }
+}

+ 6 - 2
entity/src/main/java/com/kym/entity/common/Queues.java

@@ -8,7 +8,11 @@ package com.kym.entity.common;
 public class Queues {
 
     /**
-     * 优惠券
+     * 优惠券队列
      */
-    public static final String QUEUE_MARKETING_COUPON = "MARKETING_COUPON";
+    public static final String COUPON_QUEUE = "coupon.queue";
+    /**
+     * 优惠券死信队列
+     */
+    public static final String COUPON_DELAY_QUEUE = "coupon.delay.queue";
 }

+ 5 - 49
service/src/main/java/com/kym/service/admin/impl/ActivityServiceImpl.java

@@ -21,7 +21,6 @@ import com.kym.service.jobs.DelayService;
 import com.kym.service.miniapp.UserCouponService;
 import com.kym.service.miniapp.UserRechargeRightsService;
 import com.kym.service.miniapp.UserService;
-import com.kym.service.queue.producer.CouponMessageProducer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.context.annotation.Lazy;
@@ -65,11 +64,9 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
 
     private final UserCouponService userCouponService;
 
-    private final CouponMessageProducer couponMessageProducer;
-
     public ActivityServiceImpl(ActivityStationService activityStationService, RechargeRightsService rechargeRightsService, CouponService couponService,
                                UserRechargeRightsService userRechargeRightsService, StationService stationService,
-                               @Lazy DelayService<DelayActivity> activityDelayService, BannerService bannerService, UserService userService, UserCouponService userCouponService, CouponMessageProducer couponMessageProducer) {
+                               @Lazy DelayService<DelayActivity> activityDelayService, BannerService bannerService, UserService userService, UserCouponService userCouponService) {
         this.activityStationService = activityStationService;
         this.rechargeRightsService = rechargeRightsService;
         this.couponService = couponService;
@@ -79,7 +76,6 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
         this.bannerService = bannerService;
         this.userService = userService;
         this.userCouponService = userCouponService;
-        this.couponMessageProducer = couponMessageProducer;
     }
 
     /**
@@ -150,7 +146,7 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
                 break;
         }
         userCouponService.saveBatch(userCouponList);
-        userCouponList.forEach(couponMessageProducer::sendMessage);
+//        userCouponList.forEach(couponMessageProducer::sendMessage);
     }
 
     /**
@@ -242,9 +238,11 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
     @Async
     public void handleRechargeActivity(long userId, int rechargeAmount) {
         asyncHandleRechargeActivity(userId, rechargeAmount);
-//        executor.execute(new RechargeActivityTask(userId, rechargeAmount));
     }
 
+    /**
+     * 处理充值活动
+     */
     @Transactional(rollbackFor = Exception.class)
     public void asyncHandleRechargeActivity(long userId, int rechargeAmount) {
         log.info("RechargeActivityTask run....");
@@ -275,46 +273,4 @@ public class ActivityServiceImpl extends MPJBaseServiceImpl<ActivityMapper, Acti
         log.info("RechargeActivityTask run end....");
     }
 
-    /**
-     * 处理充值活动
-     */
-    private class RechargeActivityTask implements Runnable {
-        private final long userId;
-        private final int rechargeAmount;
-
-        public RechargeActivityTask(long userId, int rechargeAmount) {
-            this.userId = userId;
-            this.rechargeAmount = rechargeAmount;
-        }
-
-        @Override
-        public void run() {
-            log.info("RechargeActivityTask run....");
-            // 进行中的充值权益活动
-            // 手动切换数据源
-            DynamicDataSourceContextHolder.push("db-admin");
-            var activity = lambdaQuery().eq(Activity::getDiscountType, DISCOUNT_TYPE_服务费折扣权益).eq(Activity::getStatus, Activity.STATUS_进行中).one();
-            // 充值金额,匹配到到具体的充值权益,生成用户权益
-            if (activity != null) {
-                var rechargeRights = rechargeRightsService.lambdaQuery().eq(RechargeRights::getActivityId, activity.getId())
-                        .le(RechargeRights::getAmountMin, rechargeAmount)
-                        .ge(RechargeRights::getAmountMax, rechargeAmount) // 最后一档最大值设置成10000
-                        .one();
-                DynamicDataSourceContextHolder.poll();
-                if (rechargeRights != null) {
-                    var userRechargeRights = new UserRechargeRights().setRightsId(rechargeRights.getId()).setUserId(userId).setRightsBalance(rechargeAmount);
-                    BeanUtils.copyProperties(rechargeRights, userRechargeRights, "id", "createTime", "updateTime");
-                    // 计算有效期
-                    var endTime = LocalDateTime.now().with(LocalTime.MAX).plusDays(rechargeRights.getValidity() - 1);
-                    // 手动切换数据源
-                    DynamicDataSourceContextHolder.push("db-miniapp");
-                    userRechargeRightsService.save(userRechargeRights.setStartTime(LocalDateTime.now()).setEndTime(endTime));
-                    DynamicDataSourceContextHolder.poll();
-                }
-            } else {
-                DynamicDataSourceContextHolder.poll();
-            }
-            log.info("RechargeActivityTask run end....");
-        }
-    }
 }

+ 1 - 2
service/src/main/java/com/kym/service/miniapp/impl/UserServiceImpl.java

@@ -72,7 +72,6 @@ public class UserServiceImpl extends MPJBaseServiceImpl<UserMapper, User> implem
         this.refundLogService = refundLogService;
         this.carsService = carsService;
         this.userRechargeRightsService = userRechargeRightsService;
-
         this.rechargeRightsService = rechargeRightsService;
         this.activityService = activityService;
         this.bannerService = bannerService;
@@ -264,7 +263,7 @@ public class UserServiceImpl extends MPJBaseServiceImpl<UserMapper, User> implem
         var user2RefundTimes = refund.stream().collect(Collectors.groupingBy(RefundLog::getUserId, Collectors.counting()));
 
         // 将用户余额,退款次数,退款金额放入result中
-        var res = result.stream().peek(vo-> {
+        var res = result.stream().peek(vo -> {
             vo.setBalance(user2Balance.getOrDefault(vo.getUserId(), 0));
             vo.setFrozenAmount(user2FrozenAmount.getOrDefault(vo.getUserId(), 0));
             vo.setRefundTimes(user2RefundTimes.getOrDefault(vo.getUserId(), 0L));

+ 0 - 8
service/src/main/java/com/kym/service/queue/MessageQueueService.java

@@ -1,8 +0,0 @@
-package com.kym.service.queue;
-
-/**
- * 消息队列服务
- */
-public interface MessageQueueService {
-    void sendMessage(String exchange, String routingKey, Object msg);
-}

+ 0 - 44
service/src/main/java/com/kym/service/queue/RabbitMQConfig.java

@@ -1,44 +0,0 @@
-package com.kym.service.queue;
-
-import jakarta.annotation.Resource;
-import org.springframework.amqp.core.AcknowledgeMode;
-import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.core.RabbitAdmin;
-import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class RabbitMQConfig {
-
-    //配置连接工厂
-    @Resource
-    private ConnectionFactory connectionFactory;
-
-    @Bean
-    public RabbitAdmin rabbitAdmin() {
-        //需要传入
-        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
-        rabbitAdmin.setAutoStartup(true);
-        return rabbitAdmin;
-    }
-
-
-    /**
-     * 设置成手动ack
-     * https://blog.csdn.net/m912595719/article/details/83787486
-     * @param connectionFactory
-     * @return
-     */
-    @Bean
-    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
-        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
-        factory.setConnectionFactory(connectionFactory);
-        // factory.setMessageConverter(new Jackson2JsonMessageConverter());
-        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
-        return factory;
-    }
-
-
-}

+ 1 - 1
service/src/main/java/com/kym/service/queue/callback/ConfirmCallbackService.java

@@ -15,7 +15,7 @@ public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
         if (!ack) {
             log.error("消息发送异常!");
         } else {
-            log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
+            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
         }
     }
 }

+ 3 - 3
service/src/main/java/com/kym/service/queue/callback/ReturnCallbackService.java

@@ -5,13 +5,13 @@ import org.springframework.amqp.core.ReturnedMessage;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Component;
 
-
 @Slf4j
 @Component
 public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
 
     @Override
-    public void returnedMessage(ReturnedMessage returnedMessage) {
-        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getExchange());
+    public void returnedMessage(ReturnedMessage returnMessage) {
+        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returnMessage.getReplyCode(), returnMessage.getReplyText(), returnMessage.getExchange(), returnMessage.getRoutingKey());
+
     }
 }

+ 33 - 0
service/src/main/java/com/kym/service/queue/config/QueueConfig.java

@@ -0,0 +1,33 @@
+package com.kym.service.queue.config;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.FanoutExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+@Configuration
+public class QueueConfig {
+
+    @Bean(name = "confirmTestQueue")
+    public Queue confirmTestQueue() {
+        return new Queue("confirm_test_queue", true, false, false);
+    }
+
+    @Bean(name = "confirmTestExchange")
+    public FanoutExchange confirmTestExchange() {
+        return new FanoutExchange("confirmTestExchange");
+    }
+
+    @Bean
+    public Binding confirmTestFanoutExchangeAndQueue(
+            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
+            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
+        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
+    }
+}
+
+

+ 0 - 53
service/src/main/java/com/kym/service/queue/impl/MessageQueueServiceImpl.java

@@ -1,53 +0,0 @@
-package com.kym.service.queue.impl;
-
-import com.kym.service.queue.MessageQueueService;
-import com.kym.service.queue.callback.ConfirmCallbackService;
-import com.kym.service.queue.callback.ReturnCallbackService;
-import org.springframework.amqp.core.MessageDeliveryMode;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.stereotype.Service;
-
-import java.util.UUID;
-
-/**
- * 消息队列服务
- *
- * @author skyline
- */
-@Service
-public class MessageQueueServiceImpl implements MessageQueueService {
-
-    private final RabbitTemplate rabbitTemplate;
-    private final ConfirmCallbackService confirmCallbackService;
-
-    private final ReturnCallbackService returnCallbackService;
-
-
-    public MessageQueueServiceImpl(RabbitTemplate rabbitTemplate, ConfirmCallbackService confirmCallbackService, ReturnCallbackService returnCallbackService) {
-        this.rabbitTemplate = rabbitTemplate;
-        this.confirmCallbackService = confirmCallbackService;
-        this.returnCallbackService = returnCallbackService;
-    }
-
-    @Override
-    public void sendMessage(String exchange, String routingKey, Object msg) {
-
-        //  确保消息发送失败后可以重新返回到队列中, 注意:yml需要配置 publisher-returns: true
-        rabbitTemplate.setMandatory(true);
-        // 消费者确认收到消息后,手动ack回执回调处理
-        rabbitTemplate.setConfirmCallback(confirmCallbackService);
-
-        // 消息投递到队列失败回调处理
-        rabbitTemplate.setReturnsCallback(returnCallbackService);
-        // 发送消息
-        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
-                message -> {
-                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
-                    return message;
-                },
-                new CorrelationData(UUID.randomUUID().toString()));
-    }
-
-
-}

+ 0 - 56
service/src/main/java/com/kym/service/queue/producer/CouponMessageProducer.java

@@ -1,56 +0,0 @@
-package com.kym.service.queue.producer;
-
-import com.kym.entity.common.Queues;
-import com.kym.service.queue.MessageQueueService;
-import org.springframework.amqp.AmqpException;
-import org.springframework.amqp.core.Binding;
-import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.Queue;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.Bean;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Component;
-
-/**
- * 优惠券消息生产者
- *
- * @author skyline
- */
-@Component
-public class CouponMessageProducer {
-
-    private static final String EXCHANGE = "kym";
-    private static final String ROUTING_KEY = "coupon";
-    private final MessageQueueService messageQueueService;
-
-    public CouponMessageProducer(MessageQueueService messageQueueService) {
-        this.messageQueueService = messageQueueService;
-    }
-
-    @Async
-    public void  sendMessage(Object message) {
-        try {
-            messageQueueService.sendMessage(EXCHANGE, ROUTING_KEY, message);
-        } catch (AmqpException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Bean(name = "queue")
-    protected Queue queue() {
-        return new Queue(Queues.QUEUE_MARKETING_COUPON, true, false, false);
-    }
-
-    @Bean(name = "exchange")
-    protected DirectExchange exchange() {
-        return new DirectExchange(EXCHANGE, true, false);
-    }
-
-    @Bean
-    private Binding binding(@Qualifier("exchange") DirectExchange exchange, @Qualifier("queue") Queue queue) {
-        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
-    }
-
-
-}

+ 10 - 8
service/src/main/java/com/kym/service/queue/consumer/ReceiverMessage.java → service/src/main/java/com/kym/service/queue/receiver/ReceiverMessage.java

@@ -1,6 +1,5 @@
-package com.kym.service.queue.consumer;
+package com.kym.service.queue.receiver;
 
-import com.kym.entity.common.Queues;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
@@ -17,12 +16,11 @@ import java.io.IOException;
  */
 @Slf4j
 @Component
-
+@RabbitListener(queues = "confirm_test_queue")
 public class ReceiverMessage {
 
-    @RabbitListener(queues = Queues.QUEUE_MARKETING_COUPON)
     @RabbitHandler
-    public void messageHandler(String msg, Channel channel, Message message) throws IOException {
+    public void processHandler1(String msg, Channel channel, Message message) throws IOException {
 
         try {
             log.info("消费者 1 号收到:{}", msg);
@@ -31,13 +29,17 @@ public class ReceiverMessage {
 
             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 
-        } catch (Exception e) {
+        }  catch (Exception e) {
+
             if (message.getMessageProperties().getRedelivered()) {
+
                 log.error("消息已重复处理失败,拒绝再次接收...");
-                // 拒绝消息
-                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
+
+                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
             } else {
+
                 log.error("消息即将再次返回队列处理...");
+
                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
             }
         }

+ 63 - 0
service/src/main/java/com/kym/service/queue/receiver/ReceiverMessage1.java

@@ -0,0 +1,63 @@
+package com.kym.service.queue.receiver;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * @Author: 公众号:程序员小富
+ * @Description:
+ */
+@Slf4j
+@Component
+@RabbitListener(queues = "confirm_test_queue")
+public class ReceiverMessage1 {
+
+    private int retryNum = 5;
+
+    private int currentNum = 0;
+
+
+//    @RabbitHandler
+//    public void processHandler(String msg, Channel channel, Message message) throws IOException {
+//
+//        try {
+//            log.info("消费者 2 号收到:{}", msg);
+//
+//            int a = 1 / 0;
+//
+//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+//
+//        } catch (Exception e) {
+//
+//            if (currentNum <= 1000) {
+//
+//                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+//            }
+//            currentNum++;
+//        }
+//    }
+
+    @RabbitHandler
+    public void processHandler(CorrelationData correlationData , String msg, Channel channel, Message message) throws IOException {
+
+        try {
+            log.info("消费者 2 号收到:{}", msg);
+
+            String correlationId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
+
+            System.out.println(correlationId);
+
+            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+
+        } catch (Exception e) {
+
+        }
+    }
+}

+ 67 - 0
service/src/main/java/com/kym/service/queue/sender/SendMessage.java

@@ -0,0 +1,67 @@
+package com.kym.service.queue.sender;
+
+import com.kym.service.queue.callback.ConfirmCallbackService;
+import com.kym.service.queue.callback.ReturnCallbackService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.MessageDeliveryMode;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+/**
+ * @Author: 公众号:程序员小富
+ * @Description:
+ */
+@Slf4j
+@Component
+public class SendMessage {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @Autowired
+    private ConfirmCallbackService confirmCallbackService;
+
+    @Autowired
+    private ReturnCallbackService returnCallbackService;
+
+    /**
+     * @param exchange   交换机
+     * @param routingKey 队列
+     * @param msg        消息体
+     * @author 公众号:程序员小富
+     * @description 发送消息
+     * @date 2020/6/29 16:22
+     */
+    public void sendMessage(String exchange, String routingKey, Object msg) {
+
+        /**
+         * 确保消息发送失败后可以重新返回到队列中
+         * 注意:yml需要配置 publisher-returns: true
+         */
+        rabbitTemplate.setMandatory(true);
+
+        /**
+         * 消费者确认收到消息后,手动ack回执回调处理
+         */
+        rabbitTemplate.setConfirmCallback(confirmCallbackService);
+
+        /**
+         * 消息投递到队列失败回调处理
+         */
+        rabbitTemplate.setReturnsCallback(returnCallbackService);
+
+        /**
+         * 发送消息
+         */
+        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
+                message -> {
+                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+                    return message;
+                },
+                new CorrelationData(UUID.randomUUID().toString()));
+    }
+}