فهرست منبع

rabbit 批量香妃

skyline 2 سال پیش
والد
کامیت
ad942daed5

+ 3 - 1
admin/src/main/resources/application-dev.yml

@@ -36,7 +36,7 @@ wechat:
     aesKey: #微信小程序消息服务器配置的EncodingAESKey
     msgDataFormat: JSON
 
-#运营看板小程序
+  #运营看板小程序
   kanban:
     appid: wx35b22037ccbc7b3f
     secret: b59ff1b7d160b5c0efe33570a2f3fdf2
@@ -125,6 +125,8 @@ spring:
           initial-interval: 3000ms
           max-interval: 6000ms
           multiplier: 2
+        consumer-batch-enabled: true #开启批量消费
+        batch-size: 100 #每次批量消费大小
 
 kym:
   notify-email: skyline@kuaiyuman.cn

+ 19 - 0
admin/src/main/resources/application-prod.yml

@@ -108,6 +108,25 @@ spring:
     redis:
       # 缓存过期时间:7天
       time-to-live: 604800
+  rabbitmq:
+    host: 121.40.98.15
+    port: 5674
+    username: kym
+    password: kym!@123
+    virtual-host: /
+    publisher-returns: true
+    publisher-confirms: true
+    listener:
+      simple:
+        acknowledge-mode: manual
+        retry:
+          enabled: true
+          max-attempts: 3
+          initial-interval: 3000ms
+          max-interval: 6000ms
+          multiplier: 2
+        consumer-batch-enabled: true #开启批量消费
+        batch-size: 100 #每次批量消费大小
 
 kym:
   notify-email: zaizai@kuaiyuman.cn,skyline@kuaiyuman.cn

+ 1 - 1
service/src/main/java/com/kym/service/admin/impl/ActivityServiceImpl.java

@@ -21,7 +21,7 @@ import com.kym.service.jobs.DelayService;
 import com.kym.service.miniapp.UserCouponService;
 import com.kym.service.miniapp.UserRechargeRightsService;
 import com.kym.service.miniapp.UserService;
-import com.kym.service.queue.sender.UserCouponSender;
+import com.kym.service.queue.producer.UserCouponSender;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.context.annotation.Lazy;

+ 11 - 0
service/src/main/java/com/kym/service/queue/config/QueueConfig.java

@@ -7,6 +7,7 @@ import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
 import org.springframework.amqp.core.CustomExchange;
 import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -22,6 +23,16 @@ public class QueueConfig {
      */
     public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message";
 
+    /**
+     * 指定批处理MessageListenerConverter(这里必须设置)
+     * @param containerFactory
+     * @return
+     */
+    @Bean
+    public String cusSimpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactory containerFactory) {
+        containerFactory.setBatchListener(true);
+        return "cusSimpleRabbitListenerContainerFactory";
+    }
 
     /**
      * 延迟队列

+ 51 - 0
service/src/main/java/com/kym/service/queue/consumer/UserCouponConsumer.java

@@ -0,0 +1,51 @@
+package com.kym.service.queue.consumer;
+
+import com.kym.entity.common.Queues;
+import com.kym.service.miniapp.UserCouponService;
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+@Component
+public class UserCouponConsumer {
+
+    private final UserCouponService userCouponService;
+
+    public UserCouponConsumer(UserCouponService userCouponService) {
+        this.userCouponService = userCouponService;
+    }
+
+    @RabbitHandler
+    @RabbitListener(queues = Queues.DELAY_QUEUE)
+    public void userCouponHandler(List<Message> messages, Channel channel) {
+        log.info("userCouponHandler收到:{}", messages);
+
+        final List<String> messageList = messages.stream()
+                .map(each -> new String(each.getBody()))
+                .toList();
+        log.info("[onMessage] size:{}", messages.size());
+
+        try {
+            for (Message message : messages) {
+                // todo 业务逻辑处理
+                log.info("消息:{}", new String(message.getBody()));
+
+
+                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
+                log.info("批次处理完毕.........");
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 1 - 1
service/src/main/java/com/kym/service/queue/sender/MessageSender.java → service/src/main/java/com/kym/service/queue/producer/MessageSender.java

@@ -1,4 +1,4 @@
-package com.kym.service.queue.sender;
+package com.kym.service.queue.producer;
 
 import com.kym.service.queue.callback.ConfirmCallbackService;
 import com.kym.service.queue.callback.ReturnCallbackService;

+ 1 - 1
service/src/main/java/com/kym/service/queue/sender/UserCouponSender.java → service/src/main/java/com/kym/service/queue/producer/UserCouponSender.java

@@ -1,4 +1,4 @@
-package com.kym.service.queue.sender;
+package com.kym.service.queue.producer;
 
 import com.kym.entity.common.Exchanges;
 import com.kym.entity.common.RoutingKeys;

+ 0 - 48
service/src/main/java/com/kym/service/queue/receiver/ReceiverMessage.java

@@ -1,48 +0,0 @@
-package com.kym.service.queue.receiver;
-
-import com.kym.entity.common.Queues;
-import com.kym.service.miniapp.UserCouponService;
-import com.rabbitmq.client.Channel;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.rabbit.annotation.RabbitHandler;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-
-
-@Slf4j
-@Component
-public class ReceiverMessage {
-
-    private final UserCouponService userCouponService;
-
-    public ReceiverMessage(UserCouponService userCouponService) {
-        this.userCouponService = userCouponService;
-    }
-
-    @RabbitHandler
-    @RabbitListener(queues = Queues.DELAY_QUEUE)
-    public void userCouponHandler(String msg, Channel channel, Message message) throws IOException {
-
-        try {
-            log.info("userCouponHandler收到:{}", msg);
-
-            //TODO 具体业务
-
-            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
-        } catch (Exception e) {
-            if (message.getMessageProperties().getRedelivered()) {
-                log.error("消息已重复处理失败,拒绝再次接收...");
-                // 拒绝消息
-                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
-            } else {
-                log.error("消息即将再次返回队列处理...");
-                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
-            }
-        }
-    }
-
-}