Parcourir la source

主板上报消息调试

skyline il y a 1 an
Parent
commit
07e91d4135
26 fichiers modifiés avec 1765 ajouts et 46 suppressions
  1. 1 1
      car-wash-admin/src/main/java/com/kym/admin/AdminApplication.java
  2. 21 0
      car-wash-miniapp/pom.xml
  3. 23 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/config/AliyunLotConfig.java
  4. 269 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/mq/AmqpHandler.java
  5. 130 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/mq/ConsumerQueueForTopicDemo.java
  6. 629 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/mq/HttpEndpointSubscription.java
  7. 138 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/mq/MnsHandler.java
  8. 52 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/mq/PublishMessageDemo.java
  9. 103 0
      car-wash-miniapp/src/main/java/com/kym/miniapp/mq/ReceiveMessageDemo.java
  10. 12 4
      car-wash-miniapp/src/main/resources/application.yml
  11. 11 5
      car-wash-service/pom.xml
  12. 18 0
      car-wash-service/src/main/java/com/kym/service/awoara/entity/Method.java
  13. 1 15
      car-wash-service/src/main/java/com/kym/service/awoara/entity/request/AwoaraRequest.java
  14. 20 0
      car-wash-service/src/main/java/com/kym/service/awoara/factory/AwoaraEventHandlerFactory.java
  15. 8 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/AwoaraEventHandler.java
  16. 15 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/BootEventHandler.java
  17. 18 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/CardEventHandler.java
  18. 17 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/DeviceStateEventHandler.java
  19. 11 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/Event.java
  20. 22 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/MessageBody.java
  21. 16 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/OrderCloseEventHandler.java
  22. 15 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/OrderCreateEventHandler.java
  23. 20 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/OrderUpdateEventHandler.java
  24. 16 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/Report.java
  25. 15 0
      car-wash-service/src/main/java/com/kym/service/awoara/handle/UserLoginEventHandler.java
  26. 164 21
      car-wash-service/src/main/java/com/kym/service/awoara/impl/AwoaraServiceImpl.java

+ 1 - 1
car-wash-admin/src/main/java/com/kym/admin/AdminApplication.java

@@ -1,11 +1,11 @@
 package com.kym.admin;
 
+
 import cn.hutool.crypto.SecureUtil;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.FilterType;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.stereotype.Controller;

+ 21 - 0
car-wash-miniapp/pom.xml

@@ -55,6 +55,27 @@
             <version>1.6.2</version>
         </dependency>
 
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.xml.bind</groupId>
+            <artifactId>jaxb-impl</artifactId>
+            <version>2.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.xml.bind</groupId>
+            <artifactId>jaxb-core</artifactId>
+            <version>2.3.0.1</version>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.activation</groupId>
+            <artifactId>jakarta.activation-api</artifactId>
+            <version>2.1.3</version>
+        </dependency>
+
 
 
     </dependencies>

+ 23 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/config/AliyunLotConfig.java

@@ -0,0 +1,23 @@
+package com.kym.miniapp.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 阿里云物联网平台配置
+ *
+ * @author skyline
+ */
+@ConfigurationProperties(prefix = "aliyun.lot")
+@Configuration
+@Data
+public class AliyunLotConfig {
+    public String accessKey;
+    public String accessSecret;
+    public String consumerGroupId;
+    public String iotInstanceId;
+    public String clientId;
+    public String ampqHost;
+    public String mnsHost;
+}

+ 269 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/mq/AmqpHandler.java

@@ -0,0 +1,269 @@
+package com.kym.miniapp.mq;
+
+import com.kym.miniapp.config.AliyunLotConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 阿里云LOT AMQP消息处理(石斑鱼主板上报消息)
+ *
+ * @author skyline
+ */
+//@Component
+@Slf4j
+public class AmqpHandler implements DisposableBean {
+    /**
+     * 线程池
+     */
+    private final static ExecutorService executorService = new ThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors(),
+            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(50000));
+    /**
+     * AMQP连接
+     */
+    static List<Connection> connections = new ArrayList<>();
+//    /**
+//     * 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
+//     */
+//    private static String accessKey = "LTAI5tNhD2KFuLUN1hMEukmS";
+//    ;
+//    private static String accessSecret = "dtVF6na8Hp9W8DmAoWI9k24VXwjNyM";
+//    private static String consumerGroupId = "DEFAULT_GROUP";
+//    //iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。
+//    private static String iotInstanceId = "iot-06z00hb4ys0z7ri";
+//    //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
+//    //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
+//    private static String clientId = "car-wash-1";
+//    //${YourHost}为接入域名,请参见AMQP客户端接入说明文档。
+//    //${uid}.iot-amqp.${YourRegionId}.aliyuncs.com 对于Java、.NET、Python 2.7、Node.js、Go客户端:端口号为5671
+//    // 公共实例endpoint:iot-06z00hb4ys0z7ri.amqp.iothub.aliyuncs.com
+//    private static String host = "iot-06z00hb4ys0z7ri.amqp.iothub.aliyuncs.com";
+    // 指定单个进程启动的连接数
+    // 单个连接消费速率有限,请参考使用限制,最大64个连接
+    // 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接
+    private static int connectionCount = 4;
+    private static MessageListener messageListener = new MessageListener() {
+        @Override
+        public void onMessage(final Message message) {
+            try {
+                // 1.收到消息之后一定要ACK。
+                // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
+                // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
+                // message.acknowledge();
+                // 2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
+                // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
+                executorService.submit(() -> processMessage(message));
+            } catch (Exception e) {
+                log.error("submit task occurs exception ", e);
+            }
+        }
+    };
+    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
+        /**
+         * 连接成功建立。
+         */
+        @Override
+        public void onConnectionEstablished(URI remoteURI) {
+            log.info("onConnectionEstablished, remoteUri:{}", remoteURI);
+        }
+
+        /**
+         * 尝试过最大重试次数之后,最终连接失败。
+         */
+        @Override
+        public void onConnectionFailure(Throwable error) {
+            log.error("onConnectionFailure, {}", error.getMessage());
+        }
+
+        /**
+         * 连接中断。
+         */
+        @Override
+        public void onConnectionInterrupted(URI remoteURI) {
+            log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
+        }
+
+        /**
+         * 连接中断后又自动重连上。
+         */
+        @Override
+        public void onConnectionRestored(URI remoteURI) {
+            log.info("onConnectionRestored, remoteUri:{}", remoteURI);
+        }
+
+        @Override
+        public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+        }
+
+        @Override
+        public void onSessionClosed(Session session, Throwable cause) {
+        }
+
+        @Override
+        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
+        }
+
+        @Override
+        public void onProducerClosed(MessageProducer producer, Throwable cause) {
+        }
+    };
+    private final AliyunLotConfig aliyunLotConfig;
+
+    public AmqpHandler(AliyunLotConfig aliyunLotConfig) {
+        this.aliyunLotConfig = aliyunLotConfig;
+    }
+
+    /**
+     * 订阅消息处理
+     *
+     * @throws Exception
+     */
+    private void doSubscribe() throws Exception {
+        for (int i = 0; i < connectionCount; i++) {
+            //参数说明,请参见AMQP客户端接入说明文档。
+            long timeStamp = System.currentTimeMillis();
+            //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
+            String signMethod = "hmacsha1";
+
+            //userName组装方法,请参见AMQP客户端接入说明文档。
+            String userName = aliyunLotConfig.getClientId() + "-" + i + "|authMode=aksign"
+                    + ",signMethod=" + signMethod
+                    + ",timestamp=" + timeStamp
+                    + ",authId=" + aliyunLotConfig.getAccessKey()
+                    + ",iotInstanceId=" + aliyunLotConfig.getIotInstanceId()
+                    + ",consumerGroupId=" + aliyunLotConfig.getConsumerGroupId()
+                    + "|";
+            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
+            String signContent = "authId=" + aliyunLotConfig.getAccessKey() + "&timestamp=" + timeStamp;
+            String password = doSign(signContent, aliyunLotConfig.getAccessSecret(), signMethod);
+            String connectionUrl = "failover:(amqps://" + aliyunLotConfig.getAmpqHost() + ":5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30";
+
+            Hashtable<String, String> hashtable = new Hashtable<>();
+            hashtable.put("connectionfactory.SBCF", connectionUrl);
+            hashtable.put("queue.QUEUE", "default");
+            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+            Context context = new InitialContext(hashtable);
+            ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
+            Destination queue = (Destination) context.lookup("QUEUE");
+            // 创建连接。
+            Connection connection = cf.createConnection(userName, password);
+            connections.add(connection);
+
+            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
+            // 创建会话。
+            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
+            // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            connection.start();
+            // 创建Receiver连接。
+            MessageConsumer consumer = session.createConsumer(queue);
+            consumer.setMessageListener(messageListener);
+        }
+
+    }
+
+    /**
+     * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
+     */
+    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
+        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
+        Mac mac = Mac.getInstance(signMethod);
+        mac.init(signingKey);
+        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
+        return Base64.encodeBase64String(rawHmac);
+    }
+
+    /**
+     * 在这里处理您收到消息后的具体业务逻辑。
+     */
+    private static void processMessage(Message message) {
+        try {
+            byte[] body = message.getBody(byte[].class);
+            String content = new String(body);
+            String topic = message.getStringProperty("topic");
+            String messageId = message.getStringProperty("messageId");
+            long generateTime = message.getLongProperty("generateTime");
+            log.info("receive message"
+                    + ",\n topic = " + topic
+                    + ",\n messageId = " + messageId
+                    + ",\n generateTime = " + generateTime
+                    + ",\n content = " + content);
+        } catch (Exception e) {
+            log.error("processMessage occurs error ", e);
+        }
+    }
+
+    // 这里不能使用@PostConstruct,在初始化完成后, bean 进入增强阶段, 所以这个阶段的任何AOP都是无效的,https://www.cnblogs.com/eternityz/p/15330069.html
+    @EventListener(classes = {ContextRefreshedEvent.class})
+    @Async
+    public void init() {
+        // 开启线程处理队列消息
+        handleMessage();
+    }
+
+    private void handleMessage() {
+        executorService.execute(() -> {
+            try {
+                doSubscribe();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+        if (!executorService.isTerminated()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                log.error("processing interrupted.", e);
+            }
+        }
+    }
+
+
+    /**
+     * 关闭AMQP连接
+     *
+     * @throws Exception
+     */
+    @Override
+    public void destroy() throws Exception {
+        // 结束程序运行
+        log.info("run shutdown");
+        connections.forEach(c -> {
+            try {
+                c.close();
+            } catch (JMSException e) {
+                log.error("failed to close connection", e);
+            }
+        });
+        executorService.shutdown();
+        if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+            log.info("shutdown success");
+        } else {
+            log.info("failed to handle messages");
+        }
+    }
+}

+ 130 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/mq/ConsumerQueueForTopicDemo.java

@@ -0,0 +1,130 @@
+package com.kym.miniapp.mq;
+
+import com.aliyun.mns.client.CloudAccount;
+import com.aliyun.mns.client.CloudQueue;
+import com.aliyun.mns.client.MNSClient;
+import com.aliyun.mns.common.ClientException;
+import com.aliyun.mns.common.ServiceException;
+import com.aliyun.mns.common.utils.ServiceSettings;
+import com.aliyun.mns.model.Message;
+import java.io.StringReader;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import org.apache.commons.codec.binary.Base64;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * 在 topic 模型下,queue有三个类型,xml、json、simple,在base 64加密下不一样,详见下文
+ * 1. 遵循阿里云规范,env 设置ak、sk。
+ * 2. ${"user.home"}/.aliyun-mns.properties 文件配置如下:
+ *           mns.endpoint=http://xxxxxxx
+ *           mns.msgBodyBase64Switch=true/false
+ */
+public class ConsumerQueueForTopicDemo {
+    /**
+     * 是否做 base64 编码
+     */
+    private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
+
+    public static void main(String[] args) {
+        String QUEUE_NAME = "TestQueue";
+
+        // 遵循阿里云规范,env设置ak、sk。
+        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
+        //this client need only initialize once
+        MNSClient client = account.getMNSClient();
+        CloudQueue queue = client.getQueueRef(QUEUE_NAME);
+
+        try {
+            longPollingBatchReceive(queue);
+        } catch (ClientException ce) {
+            System.out.println("Something wrong with the network connection between client and MNS service."
+                    + "Please check your network and DNS availablity.");
+            ce.printStackTrace();
+        } catch (ServiceException se) {
+            if (se.getErrorCode().equals("QueueNotExist")) {
+                System.out.println("Queue is not exist.Please create queue before use");
+            } else if (se.getErrorCode().equals("TimeExpired")) {
+                System.out.println("The request is time expired. Please check your local machine timeclock");
+            }
+            se.printStackTrace();
+        } catch (Exception e) {
+            System.out.println("Unknown exception happened!");
+            e.printStackTrace();
+        }
+
+        client.close();
+    }
+
+    private static void longPollingBatchReceive(CloudQueue queue) {
+        System.out.println("=============start longPollingBatchReceive=============");
+
+        // 一次性拉取 最多 xx 条消息
+        int batchSize = 15;
+        // 长轮询时间为 xx s
+        int waitSeconds = 15;
+
+        List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
+        if (messages != null && messages.size() > 0) {
+
+            for (Message message : messages) {
+                System.out.println("message handle: " + message.getReceiptHandle());
+                System.out.println("message body: " + message.getOriginalMessageBody());
+                System.out.println("message body real data: " + getMessageBodyData(message));
+                System.out.println("message id: " + message.getMessageId());
+                System.out.println("message dequeue count:" + message.getDequeueCount());
+                //<<to add your special logic.>>
+
+                //remember to  delete message when consume message successfully.
+                queue.deleteMessage(message.getReceiptHandle());
+                System.out.println("delete message successfully.\n");
+            }
+        }
+
+        System.out.println("=============end longPollingBatchReceive=============");
+
+    }
+
+    private static String getMessageBodyData(Message message){
+        if (message == null){
+            return null;
+        }
+        String originalMessageBody = message.getOriginalMessageBody();
+
+        // 1. 尝试解析为JSON
+        try {
+            JSONObject object = new JSONObject(originalMessageBody);
+            String jsonMessageData = String.valueOf(object.get("Message"));
+            System.out.println("message body type: JSON,value:"+jsonMessageData );
+            return IS_BASE64? new String(Base64.decodeBase64(jsonMessageData)): jsonMessageData;
+        } catch (JSONException ex1) {
+            // 不是JSON,继续检查XML
+        }
+
+        // 2. 尝试解析为XML
+        try {
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder = factory.newDocumentBuilder();
+            Document doc = builder.parse(new InputSource(new StringReader(originalMessageBody)));
+            Element root = doc.getDocumentElement();
+            NodeList nodeList = root.getElementsByTagName("Message");
+            String content = nodeList.item(0).getTextContent();
+            System.out.println("message body type: XML,value:"+content );
+
+            return IS_BASE64? new String(Base64.decodeBase64(content)): content;
+        } catch (Exception ex) {
+            // 不是有效的XML
+        }
+
+        // 既不是JSON也不是XML,视为普通文本
+        System.out.println("message body type: SIMPLE" );
+        return IS_BASE64 ? message.getMessageBody() : message.getMessageBodyAsRawString();
+
+    }
+}

+ 629 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/mq/HttpEndpointSubscription.java

@@ -0,0 +1,629 @@
+package com.kym.miniapp.mq;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.BindException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.security.PublicKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.ConnectionClosedException;
+import org.apache.http.Header;
+import org.apache.http.HttpConnectionFactory;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpServerConnection;
+import org.apache.http.HttpStatus;
+import org.apache.http.MethodNotSupportedException;
+import org.apache.http.impl.DefaultBHttpServerConnection;
+import org.apache.http.impl.DefaultBHttpServerConnectionFactory;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.HttpRequestHandler;
+import org.apache.http.protocol.HttpService;
+import org.apache.http.protocol.ResponseConnControl;
+import org.apache.http.protocol.ResponseContent;
+import org.apache.http.protocol.ResponseDate;
+import org.apache.http.protocol.ResponseServer;
+import org.apache.http.protocol.UriHttpRequestHandlerMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+/**
+ * HTTP/1.1 file server,处理发送到/notifications的请求
+ * 用户侧 http 服务样例,用于接收订阅消息
+ */
+public class HttpEndpointSubscription {
+    public static Logger logger = LoggerFactory.getLogger(HttpEndpointSubscription.class);
+    public static Thread t;
+    private int port;
+
+    /**
+     * 静态方法,使用本机地址用于生成一个endpoint地址
+     *
+     * @return http endpoint
+     */
+    public static String genEndpointLocal() {
+        return HttpEndpointSubscription.genEndpointLocal(80);
+    }
+
+    /**
+     * 静态方法,使用本机地址用于生成一个endpoint地址
+     *
+     * @param port, http server port
+     * @return http endpoint
+     */
+    public static String genEndpointLocal(int port) {
+        try {
+            InetAddress addr = InetAddress.getLocalHost();
+            String ip = addr.getHostAddress().toString();
+            return "http://" + ip + ":" + port;
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            logger.warn("get local host fail," + e.getMessage());
+            return "http://127.0.0.1:" + port;
+        }
+
+    }
+
+    /**
+     * 构造函数,用指定端口构造HttpEndpoint对象
+     *
+     * @param port, http server port
+     */
+    public HttpEndpointSubscription(int port) {
+        init(port);
+    }
+
+    /**
+     * 构造函数,构造HttpEndpoint对象,默认80端口
+     */
+    public HttpEndpointSubscription() {
+        init(80);
+    }
+
+    private void init(int port) {
+        this.port = port;
+        t = null;
+    }
+
+    /**
+     * start http server
+     *
+     * @throws Exception exception
+     */
+    public void start() throws Exception {
+        //check port if used
+        try {
+            new Socket(InetAddress.getLocalHost(), this.port);
+            System.out.println("port is used!");
+            logger.error("port already in use, http server start failed");
+            throw new BindException("port already in use");
+        } catch (IOException e) {
+            //e.printStackTrace();
+
+        }
+
+        // Set up the HTTP protocol processor
+        HttpProcessor httpproc = HttpProcessorBuilder.create()
+                .add(new ResponseDate())
+                .add(new ResponseServer("MNS-Endpoint/1.1"))
+                .add(new ResponseContent())
+                .add(new ResponseConnControl()).build();
+
+        // Set up request handlers, listen /notifications request whit NSHandler class
+        UriHttpRequestHandlerMapper reqistry = new UriHttpRequestHandlerMapper();
+
+        // todo 来自通知,不同的请求到不同的处理单元
+        reqistry.register("/notifications", new NSHandler());
+        reqistry.register("/simplified", new SimplifiedNSHandler());
+
+        // Set up the HTTP service
+        HttpService httpService = new HttpService(httpproc, reqistry);
+
+        //start thread for http server
+        t = new RequestListenerThread(port, httpService, null);
+        t.setDaemon(false);
+        t.start();
+    }
+
+    /**
+     * stop http endpoint
+     */
+    public void stop() {
+        if (t != null) {
+            t.interrupt();
+            try {
+                t.join(10 * 1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        System.out.println("endpoint stop");
+    }
+
+    /**
+     * check if this request comes from MNS Server
+     *
+     * @param method,  http method
+     * @param uri,     http uri
+     * @param headers, http headers
+     * @param cert,    cert url
+     * @return true if verify pass
+     */
+    private Boolean authenticate(String method, String uri, Map<String, String> headers, String cert) {
+        String str2sign = getSignStr(method, uri, headers);
+        //System.out.println(str2sign);
+        String signature = headers.get("Authorization");
+        byte[] decodedSign = Base64.decodeBase64(signature);
+        //get cert, and verify this request with this cert
+        try {
+            //String cert = "http://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem";
+            URL url = new URL(cert);
+            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+            DataInputStream in = new DataInputStream(conn.getInputStream());
+            CertificateFactory cf = CertificateFactory.getInstance("X.509");
+
+            Certificate c = cf.generateCertificate(in);
+            PublicKey pk = c.getPublicKey();
+
+            java.security.Signature signetcheck = java.security.Signature.getInstance("SHA1withRSA");
+            signetcheck.initVerify(pk);
+            signetcheck.update(str2sign.getBytes());
+            Boolean res = signetcheck.verify(decodedSign);
+            return res;
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.warn("authenticate fail, " + e.getMessage());
+            return false;
+        }
+    }
+
+    /**
+     * build string for sign
+     *
+     * @param method,  http method
+     * @param uri,     http uri
+     * @param headers, http headers
+     * @return String fro sign
+     */
+    private String getSignStr(String method, String uri, Map<String, String> headers) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(method);
+        sb.append("\n");
+        sb.append(safeGetHeader(headers, "Content-md5"));
+        sb.append("\n");
+        sb.append(safeGetHeader(headers, "Content-Type"));
+        sb.append("\n");
+        sb.append(safeGetHeader(headers, "Date"));
+        sb.append("\n");
+
+        List<String> tmp = new ArrayList<String>();
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            if (entry.getKey().startsWith("x-mns-")) {
+                tmp.add(entry.getKey() + ":" + entry.getValue());
+            }
+        }
+        Collections.sort(tmp);
+
+        for (String kv : tmp) {
+            sb.append(kv);
+            sb.append("\n");
+        }
+
+        sb.append(uri);
+        return sb.toString();
+    }
+
+    private String safeGetHeader(Map<String, String> headers, String name) {
+        if (headers.containsKey(name)) {
+            return headers.get(name);
+        } else {
+            return "";
+        }
+    }
+
+    public class SimplifiedNSHandler implements HttpRequestHandler {
+        /**
+         * process method for NSHandler
+         *
+         * @param request,  http request
+         * @param response, http responst
+         * @param context,  http context
+         * @throws HttpException exception
+         * @throws IOException   exception
+         */
+        @Override
+        public void handle(
+                final HttpRequest request,
+                final HttpResponse response,
+                final HttpContext context) throws HttpException, IOException {
+
+            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
+
+            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
+                throw new MethodNotSupportedException(method + " method not supported");
+            }
+
+            Header[] headers = request.getAllHeaders();
+            Map<String, String> hm = new HashMap<String, String>();
+            for (Header h : headers) {
+                System.out.println(h.getName() + ":" + h.getValue());
+                hm.put(h.getName(), h.getValue());
+            }
+
+            String target = request.getRequestLine().getUri();
+            System.out.println(target);
+
+            if (request instanceof HttpEntityEnclosingRequest) {
+                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
+
+                //verify request
+                Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
+                if (certHeader == null) {
+                    System.out.println("SigningCerURL Header not found");
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+
+                String cert = certHeader.getValue();
+                if (cert.isEmpty()) {
+                    System.out.println("SigningCertURL empty");
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+                cert = new String(Base64.decodeBase64(cert));
+                System.out.println("SigningCertURL:\t" + cert);
+                logger.debug("SigningCertURL:\t" + cert);
+
+                if (!authenticate(method, target, hm, cert)) {
+                    System.out.println("authenticate fail");
+                    logger.warn("authenticate fail");
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+
+                //parser content of simplified notification
+                InputStream is = entity.getContent();
+                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+                StringBuffer buffer = new StringBuffer();
+                String line = "";
+                while ((line = reader.readLine()) != null) {
+                    buffer.append(line);
+                }
+                String content = buffer.toString();
+
+                System.out.println("Simplified Notification: \n" + content);
+            }
+
+            response.setStatusCode(HttpStatus.SC_NO_CONTENT);
+        }
+    }
+
+    /**
+     * core class for processing /notifications request
+     */
+    public class NSHandler implements HttpRequestHandler {
+        public Logger logger = LoggerFactory.getLogger(HttpRequestHandler.class);
+
+        public NSHandler() {
+            super();
+        }
+
+        private String safeGetElementContent(Element element, String tag) {
+            NodeList nl = element.getElementsByTagName(tag);
+            if (nl != null && nl.getLength() > 0) {
+                return nl.item(0).getTextContent();
+            } else {
+                logger.warn("get " + tag + " from xml fail");
+                return "";
+            }
+        }
+
+        /**
+         * parser /notifications message content
+         *
+         * @param notify, xml element
+         */
+        private void parserContent(Element notify) {
+            try {
+                String topicOwner = safeGetElementContent(notify, "TopicOwner");
+                System.out.println("TopicOwner:\t" + topicOwner);
+                logger.debug("TopicOwner:\t" + topicOwner);
+                // todo topic /k1olfsszoYB/awoara-wash-dev/user/report
+                String topicName = safeGetElementContent(notify, "TopicName");
+                System.out.println("TopicName:\t" + topicName);
+                logger.debug("TopicName:\t" + topicName);
+                // todo 订阅者
+                String subscriber = safeGetElementContent(notify, "Subscriber");
+                System.out.println("Subscriber:\t" + subscriber);
+                logger.debug("Subscriber:\t" + subscriber);
+                // todo 订阅名称
+                String subscriptionName = safeGetElementContent(notify, "SubscriptionName");
+                System.out.println("SubscriptionName:\t" + subscriptionName);
+                logger.debug("SubscriptionName:\t" + subscriptionName);
+
+                // messageid
+                String msgid = safeGetElementContent(notify, "MessageId");
+                System.out.println("MessageId:\t" + msgid);
+                logger.debug("MessageId:\t" + msgid);
+
+                // if PublishMessage with base64 message
+                String msg = safeGetElementContent(notify, "Message");
+                System.out.println("Message:\t" + new String(Base64.decodeBase64(msg)));
+                logger.debug("Message:\t" + new String(Base64.decodeBase64(msg)));
+
+                //if PublishMessage with string message
+                //String msg = safeGetElementContent(notify, "Message");
+                //System.out.println("Message:\t" + msg);
+                //logger.debug("Message:\t" + msg);
+
+                String msgMD5 = safeGetElementContent(notify, "MessageMD5");
+                System.out.println("MessageMD5:\t" + msgMD5);
+                logger.debug("MessageMD5:\t" + msgMD5);
+
+                String msgPublishTime = safeGetElementContent(notify, "PublishTime");
+                Date d = new Date(Long.parseLong(msgPublishTime));
+                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                String strdate = sdf.format(d);
+                System.out.println("PublishTime:\t" + strdate);
+                logger.debug("MessagePublishTime:\t" + strdate);
+
+                String msgTag = safeGetElementContent(notify, "MessageTag");
+                if (!msgTag.equals("")) {
+                    System.out.println("MessageTag:\t" + msgTag);
+                    logger.debug("MessageTag:\t" + msgTag);
+                }
+
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+                e.printStackTrace();
+                logger.warn(e.getMessage());
+            }
+
+        }
+
+        /**
+         * process method for NSHandler
+         *
+         * @param request,  http request
+         * @param response, http responst
+         * @param context,  http context
+         * @throws HttpException exception
+         * @throws IOException   exception
+         */
+        @Override
+        public void handle(
+                final HttpRequest request,
+                final HttpResponse response,
+                final HttpContext context) throws HttpException, IOException {
+
+            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
+
+            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
+                throw new MethodNotSupportedException(method + " method not supported");
+            }
+
+            Header[] headers = request.getAllHeaders();
+            Map<String, String> hm = new HashMap<String, String>();
+            for (Header h : headers) {
+                System.out.println(h.getName() + ":" + h.getValue());
+                hm.put(h.getName(), h.getValue());
+            }
+
+            String target = request.getRequestLine().getUri();
+            System.out.println(target);
+
+            if (request instanceof HttpEntityEnclosingRequest) {
+                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
+
+                //parser xml content
+                InputStream content = entity.getContent();
+                DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+                Element notify = null;
+                try {
+                    DocumentBuilder db = dbf.newDocumentBuilder();
+                    Document document = db.parse(content);
+                    NodeList nl = document.getElementsByTagName("Notification");
+                    if (nl == null || nl.getLength() == 0) {
+                        System.out.println("xml tag error");
+                        logger.warn("xml tag error");
+                        response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                        return;
+                    }
+                    notify = (Element) nl.item(0);
+                } catch (ParserConfigurationException e) {
+                    e.printStackTrace();
+                    logger.warn("xml parser fail! " + e.getMessage());
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                } catch (SAXException e) {
+                    e.printStackTrace();
+                    logger.warn("xml parser fail! " + e.getMessage());
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+
+                //verify request
+                Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
+                if (certHeader == null) {
+                    System.out.println("SigningCerURL Header not found");
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+
+                String cert = certHeader.getValue();
+                if (cert.isEmpty()) {
+                    System.out.println("SigningCertURL empty");
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+                cert = new String(Base64.decodeBase64(cert));
+                System.out.println("SigningCertURL:\t" + cert);
+                logger.debug("SigningCertURL:\t" + cert);
+
+                if (!authenticate(method, target, hm, cert)) {
+                    System.out.println("authenticate fail");
+                    logger.warn("authenticate fail");
+                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
+                    return;
+                }
+                parserContent(notify);
+
+            }
+
+            response.setStatusCode(HttpStatus.SC_NO_CONTENT);
+        }
+
+    }
+
+    /**
+     * http listen work thread
+     */
+    public class RequestListenerThread extends Thread {
+
+        private final HttpConnectionFactory<DefaultBHttpServerConnection> connFactory;
+        private final ServerSocket serversocket;
+        private final HttpService httpService;
+
+        public RequestListenerThread(
+                final int port,
+                final HttpService httpService,
+                final SSLServerSocketFactory sf) throws IOException {
+            this.connFactory = DefaultBHttpServerConnectionFactory.INSTANCE;
+            this.serversocket = sf != null ? sf.createServerSocket(port) : new ServerSocket(port);
+            this.httpService = httpService;
+        }
+
+        @Override
+        public void run() {
+            System.out.println("Listening on port " + this.serversocket.getLocalPort());
+            Thread t = null;
+            while (!Thread.interrupted()) {
+                try {
+                    // Set up HTTP connection
+                    Socket socket = this.serversocket.accept();
+                    System.out.println("Incoming connection from " + socket.getInetAddress());
+                    HttpServerConnection conn = this.connFactory.createConnection(socket);
+
+                    // Start worker thread
+                    t = new WorkerThread(this.httpService, conn);
+                    t.setDaemon(true);
+                    t.start();
+                } catch (IOException e) {
+                    System.err.println("Endpoint http server stop or IO error: "
+                            + e.getMessage());
+                    try {
+                        if (t != null) {
+                            t.join(5 * 1000);
+                        }
+                    } catch (InterruptedException e1) {
+                        //e1.printStackTrace();
+                    }
+                    break;
+                }
+            }
+        }
+
+        @Override
+        public void interrupt() {
+            super.interrupt();
+            try {
+                this.serversocket.close();
+            } catch (IOException e) {
+                //e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * http work thread, it will dispatch /notifications to NSHandler
+     */
+    public class WorkerThread extends Thread {
+
+        private final HttpService httpservice;
+        private final HttpServerConnection conn;
+
+        public WorkerThread(
+                final HttpService httpservice,
+                final HttpServerConnection conn) {
+            super();
+            this.httpservice = httpservice;
+            this.conn = conn;
+        }
+
+        @Override
+        public void run() {
+            System.out.println("New connection thread");
+            HttpContext context = new BasicHttpContext(null);
+            try {
+                while (!Thread.interrupted() && this.conn.isOpen()) {
+                    this.httpservice.handleRequest(this.conn, context);
+                }
+            } catch (ConnectionClosedException ex) {
+                System.err.println("Client closed connection");
+            } catch (IOException ex) {
+                System.err.println("I/O error: " + ex.getMessage());
+            } catch (HttpException ex) {
+                System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage());
+            } finally {
+                try {
+                    this.conn.shutdown();
+                } catch (IOException ignore) {
+                }
+            }
+        }
+
+    }
+
+    /**
+     * 简单的使用, main函数demo
+     *
+     * @param args args
+     */
+    public static void main(String[] args) {
+        int port = 8080;
+        HttpEndpointSubscription httpEndpointSubscription = null;
+        try {
+            httpEndpointSubscription = new HttpEndpointSubscription(port);
+            httpEndpointSubscription.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (httpEndpointSubscription != null) {
+                // httpEndpointSubscription.stop();
+            }
+        }
+    }
+
+}

+ 138 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/mq/MnsHandler.java

@@ -0,0 +1,138 @@
+package com.kym.miniapp.mq;
+
+import cn.hutool.core.util.CharsetUtil;
+import com.alibaba.fastjson2.JSONObject;
+import com.aliyun.mns.client.CloudAccount;
+import com.aliyun.mns.client.CloudQueue;
+import com.aliyun.mns.client.MNSClient;
+import com.aliyun.mns.model.Message;
+import com.kym.service.awoara.factory.AwoaraEventHandlerFactory;
+import com.kym.service.awoara.handle.MessageBody;
+import com.kym.service.awoara.handle.Report;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MNS消息处理
+ *
+ * @author skyline
+ */
+@Component
+@Slf4j
+public class MnsHandler {
+
+    /**
+     * 线程池
+     */
+    private final static ExecutorService executorService = new ThreadPoolExecutor(
+            2, 4, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(10000));
+
+    /**
+     * 是否做 base64 编码
+     */
+    private static final Boolean IS_BASE64 = false;
+    /**
+     * BASE64解码器
+     */
+    private static Base64.Decoder decoder = Base64.getDecoder();
+    String queueName = "aliyun-iot-k1olfsszoYB";
+    // 遵循阿里云规范,env设置ak、sk。
+    CloudAccount account = new CloudAccount("LTAI5tNhD2KFuLUN1hMEukmS", "dtVF6na8Hp9W8DmAoWI9k24VXwjNyM", "http://1757940634296846.mns.cn-shanghai.aliyuncs.com");
+    //this client need only initialize once
+    MNSClient client = account.getMNSClient();
+    CloudQueue queue = client.getQueueRef(queueName);
+
+    /**
+     * 长轮训批量获取消息
+     *
+     * @param queue
+     */
+    @SuppressWarnings("InfiniteLoopStatement")
+    private static void longPollingBatchReceive(CloudQueue queue) {
+            executorService.execute(() -> {
+                while (true){
+                    log.info("=============start longPollingBatchReceive=============");
+
+                    // 一次性拉取最多xx条消息
+                    int batchSize = 15;
+                    // 长轮询时间为 xx s
+                    int waitSeconds = 15;
+
+                    List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
+                    if (messages != null && !messages.isEmpty()) {
+                        for (Message message : messages) {
+                            printMsgAndDelete(queue, message);
+                        }
+                    }
+                    log.info("=============end longPollingBatchReceive=============");
+                }
+            });
+        }
+
+    /**
+     * 获取单条消息
+     *
+     * @param queue
+     */
+    private static void singleReceive(CloudQueue queue) {
+        log.info("=============start singleReceive=============");
+
+        Message popMsg = queue.popMessage();
+        printMsgAndDelete(queue, popMsg);
+
+        log.info("=============end singleReceive=============");
+    }
+
+    /**
+     * 删除队列消息
+     *
+     * @param queue
+     * @param popMsg
+     */
+    private static void printMsgAndDelete(CloudQueue queue, Message popMsg) {
+        if (popMsg != null) {
+            log.info("message handle: " + popMsg.getReceiptHandle());
+            log.info("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
+            log.info("message id: " + popMsg.getMessageId());
+            log.info("message dequeue count:" + popMsg.getDequeueCount());
+            //<<to add your special logic.>>
+
+            handleMessage(popMsg.getMessageBodyAsRawString());
+
+            //remember to  delete message when consume message successfully.
+            queue.deleteMessage(popMsg.getReceiptHandle());
+            log.info("delete message successfully.\n");
+        }
+    }
+
+    static void handleMessage(String messageBody) {
+        // todo 对messageBody进行base64解码
+        var message = new String(decoder.decode(messageBody), CharsetUtil.CHARSET_UTF_8);
+        log.info("message: {}", message);
+        // 根据event类型组装成对应对象
+        var body = JSONObject.parseObject(message, MessageBody.class);
+        var report = JSONObject.parseObject(new String(decoder.decode(body.getPayload()), CharsetUtil.CHARSET_UTF_8), Report.class);
+        var handler = AwoaraEventHandlerFactory.getEventHandler(report.getEvent());
+        handler.handle(report);
+
+    }
+
+    @EventListener(classes = {ContextRefreshedEvent.class})
+    @Async
+    public void init() throws InterruptedException {
+        // 开启线程处理队列消息
+        longPollingBatchReceive(queue);
+    }
+
+}

+ 52 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/mq/PublishMessageDemo.java

@@ -0,0 +1,52 @@
+package com.kym.miniapp.mq;
+
+import com.aliyun.mns.client.CloudAccount;
+import com.aliyun.mns.client.CloudTopic;
+import com.aliyun.mns.client.MNSClient;
+import com.aliyun.mns.common.utils.ServiceSettings;
+import com.aliyun.mns.model.Base64TopicMessage;
+import com.aliyun.mns.model.RawTopicMessage;
+import com.aliyun.mns.model.TopicMessage;
+
+
+/**
+ * 1. 遵循阿里云规范,env设置ak、sk。
+ * 2. ${"user.home"}/.aliyun-mns.properties 文件配置如下:
+ *           mns.endpoint=http://xxxxxxx
+ *           mns.msgBodyBase64Switch=true/false
+ */
+public class PublishMessageDemo {
+    private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
+
+    public static void main(String[] args) {
+        String topicName = "TestTopic";
+        String message = "hello world!";
+
+        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
+        //this client need only initialize once
+        MNSClient client = account.getMNSClient();
+
+        publishMsg(client, topicName, message);
+
+        client.close();
+    }
+
+    private static void publishMsg(MNSClient client, String topicName, String message) {
+
+        CloudTopic topic = client.getTopicRef(topicName);
+        TopicMessage msg = IS_BASE64 ? new Base64TopicMessage() : new RawTopicMessage();
+        try {
+            msg.setMessageBody(message);
+            // 可选。设置该条发布消息的filterTag
+            //msg.setMessageTag("filterTag");
+            TopicMessage publishResultMsg = topic.publishMessage(msg);
+            System.out.println("message publish.");
+            System.out.println("reqId:"+publishResultMsg.getRequestId());
+            System.out.println("msgId:"+publishResultMsg.getMessageId());
+            System.out.println("md5:"+publishResultMsg.getMessageBodyMD5());
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("publishMsg error");
+        }
+    }
+}

+ 103 - 0
car-wash-miniapp/src/main/java/com/kym/miniapp/mq/ReceiveMessageDemo.java

@@ -0,0 +1,103 @@
+package com.kym.miniapp.mq;
+
+import com.aliyun.mns.client.CloudAccount;
+import com.aliyun.mns.client.CloudQueue;
+import com.aliyun.mns.client.MNSClient;
+import com.aliyun.mns.common.ClientException;
+import com.aliyun.mns.common.ServiceException;
+import com.aliyun.mns.model.Message;
+
+import java.util.List;
+
+/**
+ * 1. 遵循阿里云规范,env设置ak、sk。
+ * 2. ${"user.home"}/.aliyun-mns.properties 文件配置如下:
+ * mns.endpoint=http://xxxxxxx
+ * mns.msgBodyBase64Switch=true/false
+ */
+public class ReceiveMessageDemo {
+
+    /**
+     * 是否做 base64 编码
+     */
+    private static final Boolean IS_BASE64 = false;
+
+
+    public static void main(String[] args) {
+        String queueName = "aliyun-iot-k1olfsszoYB";
+
+        // 遵循阿里云规范,env设置ak、sk。
+        CloudAccount account = new CloudAccount("LTAI5tNhD2KFuLUN1hMEukmS","dtVF6na8Hp9W8DmAoWI9k24VXwjNyM","http://1757940634296846.mns.cn-shanghai.aliyuncs.com");
+        //this client need only initialize once
+        MNSClient client = account.getMNSClient();
+        CloudQueue queue = client.getQueueRef(queueName);
+
+        try {
+            // 基础: 单次拉取
+//            singleReceive(queue);
+
+            // 推荐: 使用的 长轮询批量拉取模型
+            longPollingBatchReceive(queue);
+        } catch (ClientException ce) {
+            System.out.println("Something wrong with the network connection between client and MNS service."
+                    + "Please check your network and DNS availablity.");
+            ce.printStackTrace();
+        } catch (ServiceException se) {
+            if (se.getErrorCode().equals("QueueNotExist")) {
+                System.out.println("Queue is not exist.Please create queue before use");
+            } else if (se.getErrorCode().equals("TimeExpired")) {
+                System.out.println("The request is time expired. Please check your local machine timeclock");
+            }
+            se.printStackTrace();
+        } catch (Exception e) {
+            System.out.println("Unknown exception happened!");
+            e.printStackTrace();
+        }
+
+        client.close();
+    }
+
+    private static void longPollingBatchReceive(CloudQueue queue) {
+        System.out.println("=============start longPollingBatchReceive=============");
+
+        // 一次性拉取最多xx条消息
+        int batchSize = 15;
+        // 长轮询时间为 xx s
+        int waitSeconds = 15;
+
+        List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
+        if (messages != null && !messages.isEmpty()) {
+
+            for (Message message : messages) {
+                printMsgAndDelete(queue, message);
+            }
+        }
+
+        System.out.println("=============end longPollingBatchReceive=============");
+
+    }
+
+    private static void singleReceive(CloudQueue queue) {
+        System.out.println("=============start singleReceive=============");
+
+        Message popMsg = queue.popMessage();
+        printMsgAndDelete(queue, popMsg);
+
+        System.out.println("=============end singleReceive=============");
+    }
+
+    private static void printMsgAndDelete(CloudQueue queue, Message popMsg) {
+        if (popMsg != null) {
+            System.out.println("message handle: " + popMsg.getReceiptHandle());
+            System.out.println("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
+            System.out.println("message id: " + popMsg.getMessageId());
+            System.out.println("message dequeue count:" + popMsg.getDequeueCount());
+            //<<to add your special logic.>>
+
+            //remember to  delete message when consume message successfully.
+            queue.deleteMessage(popMsg.getReceiptHandle());
+            System.out.println("delete message successfully.\n");
+        }
+    }
+
+}

+ 12 - 4
car-wash-miniapp/src/main/resources/application.yml

@@ -47,9 +47,9 @@ sa-token:
 #文件上传配置
 upload:
   file:
-   storage: file_storage  #文件存放地址
-   size: 20  #最大上传Mb
-   url: https://static.kuaiyuman.cn/
+    storage: file_storage  #文件存放地址
+    size: 20  #最大上传Mb
+    url: https://static.kuaiyuman.cn/
 
 oss:
   endpoint: oss-cn-shenzhen.aliyuncs.com
@@ -61,4 +61,12 @@ password:
   privateKey: MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAM1zhmu+TcTSZSQ7w0541g45o8ji4ugEC4O31GnS6tfvLTCru+9DPK2/DNdd4z1enz2PuDhktHuoEsEKPdA08fRJSMTXLL0pUEp2OQ+t7tZP6mVLvizasnP+HKAqIndXFr5nXm/okQfL/f/6L2Bben9sItoxC28Z6Y28NfAJPAg1AgMBAAECgYAKOdvQ9RHt4AMEwKzB9SXCY4AReamNntXr4nSCJ+tkgBUhvQqHqDMW+tFqztOGtHT8nXCv7eNF3GHClf3ppRj91utk8zAwZPVAVlRoNcWs60nyKRUO2uhwAV8AE+9UKDoVui7L7UaMcIkssKqQbFGIRXUjjSoPJu0yoHCdp5/3QQJBAOZTbfgmFXVgRSH4IMXJ3aZOqz+Wy3EmvNatYz8NYLBFgLJTWWXtR7URw82R7jL6F9ettfCityhAmXEZnZsEsokCQQDkWkYwFZlUYJ2ctdNEmipXw8tjpCrzQRaZnydXbjviwbSpOvOo5nrxSG5BtL9QDwiy9DL7YLBVJPykAkJm3m1NAkBn2SQTJ7CzLIXfLA4yv7LFYmEKGcZ+rRWlwaWm7zQyJhRB0xzSvSqAtJLRJEP/Dg4j+7m11te4OXA1s3QBShvpAkEAq50gpKCG5D/cE9seVK9b5SuTnmXRlZE0D+3pXi7NOOSFBq30UtosSUs6+YyCPwOdcQhPjFYlD0hFymicSL0e/QJBAOMQaABh/6BcVimWP284x/WxBQ83zzVhcl7fUyqcFvfAw1JeMmRxvm2CWYKQ9MIhQ/9ptFotRCSwMAdJTZceWys=
   publicKey: MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNc4Zrvk3E0mUkO8NOeNYOOaPI4uLoBAuDt9Rp0urX7y0wq7vvQzytvwzXXeM9Xp89j7g4ZLR7qBLBCj3QNPH0SUjE1yy9KVBKdjkPre7WT+plS74s2rJz/hygKiJ3Vxa+Z15v6JEHy/3/+i9gW3p/bCLaMQtvGemNvDXwCTwINQIDAQAB
 
-
+aliyun:
+  lot:
+    accessKey: LTAI5tNhD2KFuLUN1hMEukmS
+    accessSecret: dtVF6na8Hp9W8DmAoWI9k24VXwjNyM
+    consumerGroupId: DEFAULT_GROUP
+    iotInstanceId: iot-06z00hb4ys0z7ri
+    clientId: car-wash-app-01
+    ampq-host: iot-06z00hb4ys0z7ri.amqp.iothub.aliyuncs.com
+    mns-host: http://1757940634296846.mns.cn-shanghai.aliyuncs.com

+ 11 - 5
car-wash-service/pom.xml

@@ -58,11 +58,11 @@
         </dependency>
 
         <!-- 阿里云物联⽹云端SDK 异步调用版 -->
-        <dependency>
-            <groupId>com.aliyun</groupId>
-            <artifactId>alibabacloud-iot20180120</artifactId>
-            <version>1.0.4</version>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>com.aliyun</groupId>-->
+<!--            <artifactId>alibabacloud-iot20180120</artifactId>-->
+<!--            <version>1.0.4</version>-->
+<!--        </dependency>-->
 
 
         <!-- 阿里云消息服务 MNS -->
@@ -72,6 +72,12 @@
             <version>1.1.11</version>
         </dependency>
 
+        <!-- amqp 1.0 qpid client -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-jms-client</artifactId>
+            <version>0.57.0</version>
+        </dependency>
 
     </dependencies>
 

+ 18 - 0
car-wash-service/src/main/java/com/kym/service/awoara/entity/Method.java

@@ -0,0 +1,18 @@
+package com.kym.service.awoara.entity;
+
+import lombok.Getter;
+
+@Getter
+public enum Method {
+    help,
+    reboot,
+    query_state,
+    query_hardware_info,
+    create_order,
+    close_order,
+    query_order,
+    read_config,
+    write_config,
+    show_msgbox,
+    hide_msgbox;
+}

+ 1 - 15
car-wash-service/src/main/java/com/kym/service/awoara/entity/request/AwoaraRequest.java

@@ -1,23 +1,9 @@
 package com.kym.service.awoara.entity.request;
 
+import com.kym.service.awoara.entity.Method;
 import lombok.Builder;
 import lombok.Data;
-import lombok.Getter;
 
-@Getter
-enum Method {
-    help,
-    reboot,
-    query_state,
-    query_hardware_info,
-    create_order,
-    close_order,
-    query_order,
-    read_config,
-    write_config,
-    show_msgbox,
-    hide_msgbox;
-}
 
 /**
  * 石斑鱼主板API请求参数

+ 20 - 0
car-wash-service/src/main/java/com/kym/service/awoara/factory/AwoaraEventHandlerFactory.java

@@ -0,0 +1,20 @@
+package com.kym.service.awoara.factory;
+
+import com.kym.service.awoara.handle.*;
+
+/**
+ * 事件处理器工厂
+ */
+public class AwoaraEventHandlerFactory {
+    public static AwoaraEventHandler<?> getEventHandler(String eventName) {
+        return switch (Event.valueOf(eventName)) {
+            case boot -> new BootEventHandler();
+            case device_state -> new DeviceStateEventHandler();
+            case order_create -> new OrderCreateEventHandler();
+            case order_update -> new OrderUpdateEventHandler();
+            case order_close -> new OrderCloseEventHandler();
+            case user_login -> new UserLoginEventHandler();
+            case card_event -> new CardEventHandler();
+        };
+    }
+}

+ 8 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/AwoaraEventHandler.java

@@ -0,0 +1,8 @@
+package com.kym.service.awoara.handle;
+
+import com.kym.service.awoara.entity.DeviceState;
+
+public interface AwoaraEventHandler<T> {
+    void handle(Report<T> report);
+
+}

+ 15 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/BootEventHandler.java

@@ -0,0 +1,15 @@
+package com.kym.service.awoara.handle;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 设备启动事件处理
+ */
+@Slf4j
+public class BootEventHandler implements AwoaraEventHandler {
+    @Override
+    public void handle(Report report) {
+        log.info(report.toString());
+        log.info("DeviceStateEventHandler");
+    }
+}

+ 18 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/CardEventHandler.java

@@ -0,0 +1,18 @@
+package com.kym.service.awoara.handle;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * ⽤户刷卡事件
+ * <p>
+ * 注意:⽆论⽤户刷的是普通卡还是储值卡,都会上传这个事件。 对于储值卡总是可以刷卡开关机,如果设备在
+ * 线则上传,不在线则不上传。各字段的意义请参考订单信息中的解释
+ */
+@Slf4j
+public class CardEventHandler implements AwoaraEventHandler {
+    @Override
+    public void handle(Report report) {
+        log.info(report.toString());
+        log.info("DeviceStateEventHandler");
+    }
+}

+ 17 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/DeviceStateEventHandler.java

@@ -0,0 +1,17 @@
+package com.kym.service.awoara.handle;
+
+import com.kym.service.awoara.entity.DeviceState;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 设备状态更新事件事件处理
+ */
+@Slf4j
+public class DeviceStateEventHandler implements AwoaraEventHandler<DeviceState> {
+
+    @Override
+    public void handle(Report<DeviceState> report) {
+        log.info(report.toString());
+        log.info("DeviceStateEventHandler");
+    }
+}

+ 11 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/Event.java

@@ -0,0 +1,11 @@
+package com.kym.service.awoara.handle;
+
+public enum Event {
+    boot, // 设备启动事件,设备上电启动
+    device_state,  // 设备状态更新事件
+    order_create,  // 收到订单时上传,包含订单信息
+    order_update,  // 订单状态更新事件
+    order_close,  // 订单关闭事件
+    user_login,  // ⽤户登录事件
+    card_event,  // ⽤户刷卡事件
+}

+ 22 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/MessageBody.java

@@ -0,0 +1,22 @@
+package com.kym.service.awoara.handle;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+/**
+ * 石斑鱼主板上报消息到自定义Topic xxx/report
+ *
+ * @author skyline
+ */
+@Data
+@Accessors(chain = true)
+public class MessageBody {
+    private String payload;
+    /**
+     * 消息类型 report收的的主要是upload
+     */
+    private String messagetype;
+    private String topic;
+    private Long messageid;
+    private Long timestamp;
+}

+ 16 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/OrderCloseEventHandler.java

@@ -0,0 +1,16 @@
+package com.kym.service.awoara.handle;
+
+import com.kym.service.awoara.entity.OrderInfo;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 关闭订单事件
+ */
+@Slf4j
+public class OrderCloseEventHandler implements AwoaraEventHandler<OrderInfo> {
+    @Override
+    public void handle(Report<OrderInfo> report) {
+        log.info(report.toString());
+        log.info("OrderCloseEventHandler");
+    }
+}

+ 15 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/OrderCreateEventHandler.java

@@ -0,0 +1,15 @@
+package com.kym.service.awoara.handle;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 收到订单事件
+ */
+@Slf4j
+public class OrderCreateEventHandler implements AwoaraEventHandler {
+    @Override
+    public void handle(Report report) {
+        log.info(report.toString());
+        log.info("DeviceStateEventHandler");
+    }
+}

+ 20 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/OrderUpdateEventHandler.java

@@ -0,0 +1,20 @@
+package com.kym.service.awoara.handle;
+
+import com.kym.service.awoara.entity.OrderInfo;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 订单状态更新事件
+ * <p>
+ * 1、⽤来让服务器知道订单的当前状态,以下两个条件只要满⾜其中⼀个就上传⼀次。
+ * 2、订单的应收⾦额和上次上传的应收⾦额⼤于等于1元则上传⼀次。
+ * 3、如果⾦额不变,每个3分钟上传⼀次。
+ */
+@Slf4j
+public class OrderUpdateEventHandler implements AwoaraEventHandler<OrderInfo> {
+    @Override
+    public void handle(Report<OrderInfo> report) {
+        log.info(report.toString());
+        log.info("DeviceStateEventHandler");
+    }
+}

+ 16 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/Report.java

@@ -0,0 +1,16 @@
+package com.kym.service.awoara.handle;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+/**
+ * 石斑鱼主板上报消息到自定义Topic xxx/report
+ */
+@Data
+@Accessors(chain = true)
+public class Report<T> {
+    private String version;
+    private String event;
+    private String uptime_ms;
+    private T data;
+}

+ 15 - 0
car-wash-service/src/main/java/com/kym/service/awoara/handle/UserLoginEventHandler.java

@@ -0,0 +1,15 @@
+package com.kym.service.awoara.handle;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 用户登录事件
+ */
+@Slf4j
+public class UserLoginEventHandler implements AwoaraEventHandler {
+    @Override
+    public void handle(Report report) {
+        log.info(report.toString());
+        log.info("UserLoginEventHandler");
+    }
+}

+ 164 - 21
car-wash-service/src/main/java/com/kym/service/awoara/impl/AwoaraServiceImpl.java

@@ -27,14 +27,14 @@ public class AwoaraServiceImpl implements AwoaraService {
     private static final String IOT_INSTANCE_ID = "iot.cn-shanghai.aliyuncs.com";
     private static final String GROUP_ID = "iot.cn-shanghai.aliyuncs.com";
 
+
+    // ============================================================== 阿里云lot主动请求 ==============================================================
+
     /**
      * 初始化IoT(Iot20180120)客户端
      */
     public static com.aliyun.iot20180120.Client initialization() throws Exception {
-        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
-                .setRegionId("cn-shanghai")
-                .setAccessKeyId(System.getenv("ACCESS_KEY_ID"))
-                .setAccessKeySecret(System.getenv("ACCESS_KEY_SECRET"));
+        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config().setRegionId("cn-shanghai").setAccessKeyId(System.getenv("ACCESS_KEY_ID")).setAccessKeySecret(System.getenv("ACCESS_KEY_SECRET"));
         return new com.aliyun.iot20180120.Client(config);
     }
 
@@ -44,15 +44,10 @@ public class AwoaraServiceImpl implements AwoaraService {
      * 1.RRpc:向指定设备发送请求消息,并同步返回响应
      */
     public static RRpcResponse rRpc(Client client, String iotInstanceId, String productKey, String deviceName, String requestBase64Byte, String timeout, String topic) throws Exception {
-        var request = new RRpcRequest()
-                .setIotInstanceId(iotInstanceId)
-                .setProductKey(productKey)
-                .setDeviceName(deviceName)
-                .setRequestBase64Byte(requestBase64Byte)
-                .setTopic(timeout);
+        var request = new RRpcRequest().setIotInstanceId(iotInstanceId).setProductKey(productKey).setDeviceName(deviceName).setRequestBase64Byte(requestBase64Byte).setTopic(topic);
         // 等待设备回复消息的时间,单位是毫秒,取值范围是1,000 ~8,000。
         // 校验integer型入参
-        Integer iTimeout = Integer.parseInt(topic);
+        Integer iTimeout = Integer.parseInt(timeout);
         request.setTimeout(iTimeout);
         log.info("-------------------1.RRpc:向指定设备发送请求消息,并同步返回响应--------------------");
         RRpcResponse response = client.rRpc(request);
@@ -65,11 +60,7 @@ public class AwoaraServiceImpl implements AwoaraService {
      * 2.PubBroadcast 向指定产品所有设备,或向订阅了指定Topic的所有设备发布广播消息
      */
     public static PubBroadcastResponse pubBroadcast(com.aliyun.iot20180120.Client client, String iotInstanceId, String productKey, String messageContent, String topic) throws Exception {
-        PubBroadcastRequest request = new PubBroadcastRequest()
-                .setIotInstanceId(iotInstanceId)
-                .setProductKey(productKey)
-                .setMessageContent(messageContent)
-                .setTopicFullName(topic);
+        PubBroadcastRequest request = new PubBroadcastRequest().setIotInstanceId(iotInstanceId).setProductKey(productKey).setMessageContent(messageContent).setTopicFullName(topic);
         log.info("-------------------2.PubBroadcast 向指定产品所有设备,或向订阅了指定Topic的所有设备发布广播消息--------------------");
         PubBroadcastResponse response = client.pubBroadcast(request);
         log.info(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(response)));
@@ -84,18 +75,169 @@ public class AwoaraServiceImpl implements AwoaraService {
         // 要订阅的Topic,最多订阅10个Topic。
         // Topic的操作权限必须为订阅或发布和订阅。
         List<String> arrTopic = Arrays.stream(topicStr.split(",", 10)).toList();
-        SubscribeTopicRequest request = new SubscribeTopicRequest()
-                .setIotInstanceId(iotInstanceId)
-                .setProductKey(productKey)
-                .setDeviceName(deviceName)
-                .setTopic(arrTopic);
+        SubscribeTopicRequest request = new SubscribeTopicRequest().setIotInstanceId(iotInstanceId).setProductKey(productKey).setDeviceName(deviceName).setTopic(arrTopic);
         log.info("-------------------3.SubscribeTopic 为指定设备订阅Topic--------------------");
         SubscribeTopicResponse response = client.subscribeTopic(request);
         log.info(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(response)));
         return response;
     }
 
+    // ============================================================== 阿里云lot主动请求 ==============================================================
+
+    // ============================================================== 石斑鱼上行消息 ==============================================================
+
+    /**
+     * 调用Iot20180120客户端发送请求
+     * 1.QuerySubscribeRelation 查询MNS或AMQP服务端订阅
+     * Type取值:MNS,AMQP
+     */
+    public static void querySubscribeRelation(com.aliyun.iot20180120.Client client, String instanceId, String productKey, String type) throws Exception {
+        QuerySubscribeRelationRequest request = new QuerySubscribeRelationRequest().setProductKey(productKey).setType(type).setIotInstanceId(instanceId);
+        log.info("-------------------1.QuerySubscribeRelation 查询MNS或AMQP服务端订阅--------------------");
+        QuerySubscribeRelationResponse response = client.querySubscribeRelation(request);
+        log.info(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(response.body)));
+    }
+
+
+    /**
+     * 调用Iot20180120客户端发送请求
+     * 2.CreateSubscribeRelation 创建MNS或AMQP服务端订阅
+     */
+    public static CreateSubscribeRelationResponseBody createSubscribeRelation(com.aliyun.iot20180120.Client client, String instanceId, String productKey, String type, Boolean deviceDataFlag, Boolean deviceLifeCycleFlag, Boolean deviceStatusChangeFlag, Boolean deviceTagFlag, Boolean deviceTopoLifeCycleFlag, Boolean foundDeviceListFlag, Boolean otaEventFlag, Boolean otaJobFlag, Boolean otaVersionFlag, Boolean thingHistoryFlag, String mnsConfiguration, String consumerGroupIds) throws Exception {
+        CreateSubscribeRelationRequest request = new CreateSubscribeRelationRequest().setProductKey(productKey).setType(type).setIotInstanceId(instanceId).setThingHistoryFlag(thingHistoryFlag).setOtaVersionFlag(otaVersionFlag).setOtaJobFlag(otaJobFlag).setOtaEventFlag(otaEventFlag).setFoundDeviceListFlag(foundDeviceListFlag).setDeviceTopoLifeCycleFlag(deviceTopoLifeCycleFlag).setDeviceTagFlag(deviceTagFlag).setDeviceStatusChangeFlag(deviceStatusChangeFlag).setDeviceLifeCycleFlag(deviceLifeCycleFlag).setDeviceDataFlag(deviceDataFlag).setMnsConfiguration(mnsConfiguration);
+        // 创建的AMQP订阅中的消费组ID,Type为AMQP时必填。
+        if (!com.aliyun.teautil.Common.empty(consumerGroupIds)) {
+            request.consumerGroupIds = Arrays.stream(consumerGroupIds.split(",", 100)).toList();
+        }
+
+        log.info("-------------------2.CreateSubscribeRelation 创建MNS或AMQP服务端订阅--------------------");
+        CreateSubscribeRelationResponse response = client.createSubscribeRelation(request);
+        log.info(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(response.body)));
+        return response.body;
+    }
+
+    /**
+     * 调用Iot20180120客户端发送请求
+     * 3.UpdateSubscribeRelation 修改MNS或AMQP服务端订阅
+     */
+    public static UpdateSubscribeRelationResponseBody updateSubscribeRelation(com.aliyun.iot20180120.Client client, String iotInstanceId, String productKey, String type, Boolean deviceDataFlag, Boolean deviceLifeCycleFlag, Boolean deviceStatusChangeFlag, Boolean deviceTagFlag, Boolean deviceTopoLifeCycleFlag, Boolean foundDeviceListFlag, Boolean otaEventFlag, Boolean otaJobFlag, Boolean otaVersionFlag, Boolean thingHistoryFlag, String mnsConfiguration, String consumerGroupIds) throws Exception {
+        UpdateSubscribeRelationRequest request = new UpdateSubscribeRelationRequest().setProductKey(productKey).setType(type).setIotInstanceId(iotInstanceId).setThingHistoryFlag(thingHistoryFlag).setOtaVersionFlag(otaVersionFlag).setOtaJobFlag(otaJobFlag).setOtaEventFlag(otaEventFlag).setFoundDeviceListFlag(foundDeviceListFlag).setDeviceTopoLifeCycleFlag(deviceTopoLifeCycleFlag).setDeviceTagFlag(deviceTagFlag).setDeviceStatusChangeFlag(deviceStatusChangeFlag).setDeviceLifeCycleFlag(deviceLifeCycleFlag).setDeviceDataFlag(deviceDataFlag).setMnsConfiguration(mnsConfiguration);
+        // 创建的AMQP订阅中的消费组ID,Type为AMQP时必填。
+        if (!com.aliyun.teautil.Common.empty(consumerGroupIds)) {
+            request.consumerGroupIds = Arrays.stream(consumerGroupIds.split(",", 100)).toList();
+        }
+
+        log.info("-------------------3.UpdateSubscribeRelation 修改MNS或AMQP服务端订阅--------------------");
+        UpdateSubscribeRelationResponse response = client.updateSubscribeRelation(request);
+        log.info(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(response.body)));
+        return response.body;
+    }
+
+    /**
+     * 调用Iot20180120客户端发送请求
+     * 4.DeleteSubscribeRelation 删除MNS或AMQP服务端订阅
+     */
+    public static DeleteSubscribeRelationResponseBody deleteSubscribeRelation(com.aliyun.iot20180120.Client client, String iotInstanceId, String productKey, String type) throws Exception {
+        DeleteSubscribeRelationRequest request = new DeleteSubscribeRelationRequest().setProductKey(productKey).setType(type).setIotInstanceId(iotInstanceId);
+        log.info("-------------------4.DeleteSubscribeRelation 删除MNS或AMQP服务端订阅--------------------");
+        DeleteSubscribeRelationResponse response = client.deleteSubscribeRelation(request);
+        log.info(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(response.body)));
+        return response.body;
+    }
+
+
+    void t1() {
+        java.util.List<String> args = java.util.Arrays.asList("");
+        // * param.1:IotInstanceId 实例ID
+        // 您可在物联网平台控制台的实例概览页面,查看当前实例的ID。若有ID值,必须传入该ID值,否则调用会失败。
+        // 如何获取到实例ID:https://help.aliyun.com/document_detail/267533.htm
+        String argIotInstanceId = "";
+        // * param.2:ProductKey 产品KEY
+        String argProductKey = "";
+        // * param.3:Type 订阅类型
+        // 取值:MNS,AMQP。
+        String argType = "";
+        // * param.4:MnsConfiguration MNS队列的配置信息
+        // Type为MNS时必填。
+        // 具体要求和示例见API[CreateSubscribeRelation]或[UpdateSubscribeRelation]说明文档中的“MnsConfiguration定义”。
+        String argMnsConfiguration = "";
+        // * param.5:ConsumerGroupIds 创建的AMQP订阅中的消费组ID
+        // Type为AMQP时必填。多个内容用半角逗号[,]分隔。
+        // 调用CreateConsumerGroup创建消费组成功后,会返回消费组ID。
+        // 您可以调用QueryConsumerGroupList按消费组名称查询消费组ID,也可以在物联网平台控制台对应实例下,选择规则引擎>服务端订阅>消费组列表,查看消费组ID。
+        String argConsumerGroupIds = "";
+        // * param.6:DeviceTopoLifeCycleFlag 推送消息类型是否选择设备拓扑关系变更。
+        // 默认值为false。true:仅对网关产品有效
+        Boolean argDeviceTopoLifeCycleFlag = false;
+        // * param.7:FoundDeviceListFlag 推送消息类型是否选择网关子设备发现上报。
+        // 默认值为false。true:仅对网关产品有效
+        Boolean argFoundDeviceListFlag = false;
+        // * param.8:DeviceLifeCycleFlag 推送消息类型是否选择设备生命周期变更
+        // 默认值为false
+        Boolean argDeviceLifeCycleFlag = false;
+        // * param.9:ThingHistoryFlag 推送消息类型是否选择物模型历史数据上报。
+        // 默认值为false
+        Boolean argThingHistoryFlag = false;
+        // * param.10:OtaEventFlag 推送消息类型是否选择OTA升级状态通知。
+        // 默认值为false
+        Boolean argOtaEventFlag = false;
+        // * param.11:DeviceTagFlag 推送消息类型是否选择设备标签变更。
+        // 默认值为false。true:仅当Type为AMQP时有效。
+        Boolean argDeviceTagFlag = false;
+        // * param.12:OtaVersionFlag 推送消息类型是否选择OTA模块版本号上报。
+        // 默认值为false。true:仅当Type为AMQP时有效。
+        Boolean argOtaVersionFlag = false;
+        // * param.13:OtaJobFlag 推送消息类型是否选择OTA升级批次状态通知。
+        // 默认值为false。true:仅当Type为AMQP时有效。
+        Boolean argOtaJobFlag = false;
+        // * param.14:DeviceDataFlag 推送消息类型是否选择设备上报消息。
+        // 默认值为false
+        Boolean argDeviceDataFlag = false;
+        // * param.15:DeviceStatusChangeFlag 推送消息类型是否选择设备状态变化通知。
+        // 默认值为false
+        Boolean argDeviceStatusChangeFlag = false;
+        log.info("hello world!");
+        // 【发起客户端调用】
+        try {
+            com.aliyun.iot20180120.Client client = null;
+            log.info("第一步:查询服务端订阅 ---> 初始状态");
+            this.querySubscribeRelation(client, argIotInstanceId, argProductKey, argType);
+            log.info("第二步:创建服务端订阅");
+            CreateSubscribeRelationResponseBody createResponseBody = this.createSubscribeRelation(client, argIotInstanceId, argProductKey, argType, argDeviceDataFlag, argDeviceLifeCycleFlag, argDeviceStatusChangeFlag, argDeviceTagFlag, argDeviceTopoLifeCycleFlag, argFoundDeviceListFlag, argOtaEventFlag, argOtaJobFlag, argOtaVersionFlag, argThingHistoryFlag, argMnsConfiguration, argConsumerGroupIds);
+            if (!createResponseBody.success) {
+                log.info("创建服务端订阅失败。");
+                return;
+            }
+
+            log.info("第三步:查询服务端订阅 ---> 创建结果");
+            this.querySubscribeRelation(client, argIotInstanceId, argProductKey, argType);
+            log.info("第四步:修改服务端订阅");
+            Boolean bOtaEventFlag = true;
+            Boolean bOtaJobFlag = true;
+            UpdateSubscribeRelationResponseBody updateResponseBody = this.updateSubscribeRelation(client, argIotInstanceId, argProductKey, argType, argDeviceDataFlag, argDeviceLifeCycleFlag, argDeviceStatusChangeFlag, argDeviceTagFlag, argDeviceTopoLifeCycleFlag, argFoundDeviceListFlag, bOtaEventFlag, bOtaJobFlag, argOtaVersionFlag, argThingHistoryFlag, argMnsConfiguration, argConsumerGroupIds);
+            if (!updateResponseBody.success) {
+                log.info("修改服务端订阅失败。");
+                // 为避免遗留测试用数据,修改失败时不终止程序,继续执行下一步的删除逻辑。
+            }
+
+            log.info("第五步:查询服务端订阅 ---> 修改结果");
+            this.querySubscribeRelation(client, argIotInstanceId, argProductKey, argType);
+            log.info("第六步:删除服务端订阅");
+            DeleteSubscribeRelationResponseBody deleteResponseBody = this.deleteSubscribeRelation(client, argIotInstanceId, argProductKey, argType);
+            if (!deleteResponseBody.success) {
+                log.info("删除服务端订阅失败。");
+            }
+
+            log.info("第七步:查询服务端订阅 ---> 删除结果");
+            this.querySubscribeRelation(client, argIotInstanceId, argProductKey, argType);
+        } catch (Exception error) {
+            log.info(error.getMessage());
+        }
+    }
+
+    // ============================================================== 石斑鱼上行消息 ==============================================================
 
+    // ============================================================== 石斑鱼自定义API ==============================================================
     @Override
     public AwoaraResponse<ApiList> help() {
         return null;
@@ -155,4 +297,5 @@ public class AwoaraServiceImpl implements AwoaraService {
     public AwoaraResponse<?> writeConfig(Config config) {
         return null;
     }
+    // ============================================================== 石斑鱼自定义API ==============================================================
 }