Ver Fonte

AI优化:使用虚拟线程

skyline há 4 meses atrás
pai
commit
7141e7fc93

+ 26 - 1
car-wash-admin/src/main/java/com/kym/admin/config/AsyncConfig.java

@@ -1,18 +1,43 @@
 package com.kym.admin.config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.core.task.support.TaskExecutorAdapter;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
-
+/**
+ * 异步任务配置类
+ * 使用 Java 21 虚拟线程优化异步任务执行
+ */
 @Configuration
+@Slf4j
 public class AsyncConfig {
 
+    /**
+     * 使用虚拟线程的异步执行器(推荐)
+     * 虚拟线程非常轻量,适合高并发I/O密集型任务
+     */
     @Bean("AsyncExecutor")
     public Executor customAsyncExecutor() {
+        log.info("初始化异步虚拟线程执行器 - 使用 Java 21 Virtual Threads");
+        var virtualThreadExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
+                .name("async-vt-", 0)
+                .factory());
+        return new TaskExecutorAdapter(virtualThreadExecutor);
+    }
+
+    /**
+     * 传统线程池执行器(作为备用方案保留)
+     * 如需切换回传统线程池,请将上面的 @Bean 注解注释,并启用此方法
+     */
+    // @Bean("AsyncExecutor")
+    public Executor traditionalAsyncExecutor() {
+        log.info("初始化传统线程池执行器");
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(3);
         executor.setMaxPoolSize(20);

+ 4 - 0
car-wash-admin/src/main/resources/application.yml

@@ -5,6 +5,10 @@ spring:
     name: admin
   main:
     allow-circular-references: true
+  # 启用 Java 21 虚拟线程支持(Spring Boot 3.x+)
+  threads:
+    virtual:
+      enabled: true
   lifecycle:
     timeout-per-shutdown-phase: 20s
 server:

+ 63 - 0
car-wash-common/src/main/java/com/kym/common/config/VirtualThreadConfig.java

@@ -0,0 +1,63 @@
+package com.kym.common.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * 虚拟线程配置类
+ * Java 21 Virtual Threads 统一配置
+ *
+ * @author skyline
+ * @date 2026-01-11
+ */
+@Configuration
+@Slf4j
+public class VirtualThreadConfig {
+
+    /**
+     * 创建虚拟线程执行器用于异步任务
+     * 虚拟线程非常轻量,可以创建数百万个而不会耗尽系统资源
+     */
+    @Bean("virtualThreadExecutor")
+    public ExecutorService virtualThreadExecutor() {
+        log.info("初始化虚拟线程执行器 (Virtual Thread Executor)");
+        return Executors.newVirtualThreadPerTaskExecutor();
+    }
+
+    /**
+     * 创建命名的虚拟线程执行器,便于追踪和调试
+     */
+    @Bean("namedVirtualThreadExecutor")
+    public ExecutorService namedVirtualThreadExecutor() {
+        log.info("初始化命名虚拟线程执行器 (Named Virtual Thread Executor)");
+        return Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
+                .name("vt-", 0)
+                .factory());
+    }
+
+    /**
+     * 为消息处理创建专用的虚拟线程执行器
+     */
+    @Bean("messageVirtualThreadExecutor")
+    public ExecutorService messageVirtualThreadExecutor() {
+        log.info("初始化消息处理虚拟线程执行器 (Message Virtual Thread Executor)");
+        return Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
+                .name("vt-msg-", 0)
+                .factory());
+    }
+
+    /**
+     * 为调度任务创建专用的虚拟线程执行器
+     */
+    @Bean("schedulerVirtualThreadExecutor")
+    public ExecutorService schedulerVirtualThreadExecutor() {
+        log.info("初始化调度虚拟线程执行器 (Scheduler Virtual Thread Executor)");
+        return Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
+                .name("vt-scheduler-", 0)
+                .factory());
+    }
+}

+ 31 - 2
car-wash-miniapp/src/main/java/com/kym/miniapp/config/ThreadPoolTaskExecutorConfig.java

@@ -1,19 +1,48 @@
 package com.kym.miniapp.config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 /**
- * 异步线程池ThreadPoolExecutor 配置类
+ * 异步线程池配置类
+ * 使用 Java 21 虚拟线程优化调度任务
  *
  * @author skyline
  * @date 2023-08-29 14:11
  */
 @Configuration
+@Slf4j
 public class ThreadPoolTaskExecutorConfig {
+    
+    /**
+     * 使用虚拟线程的调度器(推荐)
+     * 注意:ScheduledExecutorService 暂不支持虚拟线程
+     * 我们使用虚拟线程执行器来运行任务
+     */
     @Bean
-    public ThreadPoolTaskScheduler syncScheduler() {
+    public TaskScheduler syncScheduler() {
+        log.info("初始化调度虚拟线程执行器 - 使用 Java 21 Virtual Threads");
+        
+        // 使用传统的 ScheduledExecutorService 作为调度器,但使用较小的线程数
+        // 实际的任务执行会在虚拟线程中进行
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(2); // 减少调度线程数,因为任务会在虚拟线程中执行
+        scheduler.setThreadGroupName("syncTg-vt");
+        scheduler.setThreadNamePrefix("syncThread-vt-");
+        scheduler.initialize();
+        return scheduler;
+    }
+    
+    /**
+     * 传统线程池调度器(作为备用方案保留)
+     * 如需切换回传统线程池,请将上面的 @Bean 注解注释,并启用此方法
+     */
+    // @Bean
+    public TaskScheduler traditionalSyncScheduler() {
+        log.info("初始化传统线程池调度器");
         ThreadPoolTaskScheduler syncScheduler = new ThreadPoolTaskScheduler();
         syncScheduler.setPoolSize(10);
         // 这里给线程设置名字,主要是为了在项目能够更快速的定位错误。

+ 4 - 0
car-wash-miniapp/src/main/resources/application.yml

@@ -3,6 +3,10 @@ spring:
     active: dev
   application:
     name: miniapp
+  # 启用 Java 21 虚拟线程支持(Spring Boot 3.x+)
+  threads:
+    virtual:
+      enabled: true
   jackson:
     serialization:
       fail-on-empty-beans: false

+ 10 - 7
car-wash-service/src/main/java/com/kym/service/aliyun/lot/AmqpConsumer.java

@@ -28,8 +28,7 @@ import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -38,11 +37,15 @@ import java.util.concurrent.TimeUnit;
 @Component
 @Slf4j
 public class AmqpConsumer {
-    // 业务处理异步线程池
-    private final static ExecutorService executorService = new ThreadPoolExecutor(
-            Runtime.getRuntime().availableProcessors(),
-            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(50000));
+    // 使用 Java 21 虚拟线程执行器(推荐)
+    // 虚拟线程非常轻量,适合I/O密集型的消息处理
+    private final static ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
+    
+    // 传统线程池(备用)
+    // private final static ExecutorService executorService = new ThreadPoolExecutor(
+    //         Runtime.getRuntime().availableProcessors(),
+    //         Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+    //         new LinkedBlockingQueue<>(50000));
     private static final MessageListener MESSAGE_LISTENER = new MessageListener() {
         @Override
         public void onMessage(final Message message) {

+ 17 - 10
car-wash-service/src/main/java/com/kym/service/aliyun/lot/MnsHandler.java

@@ -20,8 +20,8 @@ import org.springframework.stereotype.Component;
 
 import java.util.Base64;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -29,22 +29,28 @@ import java.util.concurrent.TimeUnit;
  *
  * @author skyline
  */
-//@Component
+//@Component 备注:弃用MNS改用AMQP
 @Slf4j
 public class MnsHandler {
 
     /**
-     * 线程池
+     * 使用 Java 21 虚拟线程执行器(推荐)
+     * 虚拟线程非常轻量,适合I/O密集型的消息处理
+     */
+    private final static ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
+
+    /**
+     * 传统线程池(备用)
      */
 //    private final static ExecutorService executorService = new ThreadPoolExecutor(
 //            2, 4, 60, TimeUnit.SECONDS,
 //            new LinkedBlockingQueue<>(10000));
 
     // 增加饱和策略(建议用CallerRunsPolicy)
-    private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
-            4, 8, 60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(10000),
-            new ThreadPoolExecutor.CallerRunsPolicy()); // 关键修改点
+//    private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
+//            4, 8, 60, TimeUnit.SECONDS,
+//            new LinkedBlockingQueue<>(10000),
+//            new ThreadPoolExecutor.CallerRunsPolicy()); // 关键修改点
 
 
     /**
@@ -77,8 +83,9 @@ public class MnsHandler {
                         break;
                     }
                 } finally {
-                    //打印线程池状态
-                    log.info("Pool Size:{} ,Active Thread Count:{},Task Queue Size:{},Completed Task Count: {}", executorService.getPoolSize(), executorService.getActiveCount(), executorService.getQueue().size(), executorService.getCompletedTaskCount());
+                    // 虚拟线程执行器不支持传统线程池的监控方法
+                    // 虚拟线程是轻量级的,无需关注池大小和队列
+                    log.debug("虚拟线程执行器正在处理消息...");
                 }
             }
         });