|
|
@@ -0,0 +1,212 @@
|
|
|
+package com.kym.service.aliyun.lot;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.alibaba.fastjson2.TypeReference;
|
|
|
+import com.kym.entity.awoara.Event;
|
|
|
+import com.kym.entity.awoara.MessageBody;
|
|
|
+import com.kym.service.awoara.factory.AwoaraEventHandlerFactory;
|
|
|
+import jakarta.annotation.PreDestroy;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.codec.binary.Base64;
|
|
|
+import org.apache.commons.text.StringEscapeUtils;
|
|
|
+import org.apache.qpid.jms.JmsConnection;
|
|
|
+import org.apache.qpid.jms.JmsConnectionListener;
|
|
|
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.event.ContextRefreshedEvent;
|
|
|
+import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.crypto.Mac;
|
|
|
+import javax.crypto.spec.SecretKeySpec;
|
|
|
+import javax.jms.*;
|
|
|
+import javax.naming.Context;
|
|
|
+import javax.naming.InitialContext;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Hashtable;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+/**
|
|
|
+ * AMQP订阅消息处理
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class AmqpConsumer {
|
|
|
+ // 业务处理异步线程池
|
|
|
+ private final static ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
+ Runtime.getRuntime().availableProcessors(),
|
|
|
+ Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(50000));
|
|
|
+ private static final MessageListener MESSAGE_LISTENER = new MessageListener() {
|
|
|
+ @Override
|
|
|
+ public void onMessage(final Message message) {
|
|
|
+ try {
|
|
|
+ executorService.submit(() -> processMessage(message));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("submit task occurs exception ", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ private static final JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
|
|
|
+ @Override
|
|
|
+ public void onConnectionEstablished(URI remoteURI) {
|
|
|
+ log.info("onConnectionEstablished, remoteUri:{}", remoteURI);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onConnectionFailure(Throwable error) {
|
|
|
+ log.error("onConnectionFailure, {}", error.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onConnectionInterrupted(URI remoteURI) {
|
|
|
+ log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onConnectionRestored(URI remoteURI) {
|
|
|
+ log.info("onConnectionRestored, remoteUri:{}", remoteURI);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onSessionClosed(Session session, Throwable cause) {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onProducerClosed(MessageProducer producer, Throwable cause) {
|
|
|
+ }
|
|
|
+ };
|
|
|
+ private final List<Connection> connections = new ArrayList<>();
|
|
|
+ @Value("${aliyun.lot.amqp.accessKey}")
|
|
|
+ private String accessKey;
|
|
|
+ @Value("${aliyun.lot.amqp.accessSecret}")
|
|
|
+ private String accessSecret;
|
|
|
+ @Value("${aliyun.lot.amqp.consumerGroupId}")
|
|
|
+ private String consumerGroupId;
|
|
|
+ @Value("${aliyun.lot.amqp.iotInstanceId}")
|
|
|
+ private String iotInstanceId;
|
|
|
+ @Value("${aliyun.lot.amqp.clientId}")
|
|
|
+ private String clientId;
|
|
|
+ @Value("${aliyun.lot.amqp.host}")
|
|
|
+ private String host;
|
|
|
+ private int connectionCount = 4;
|
|
|
+
|
|
|
+ private static void processMessage(Message message) {
|
|
|
+ try {
|
|
|
+ var jsonObject = new JSONObject();
|
|
|
+ byte[] body = message.getBody(byte[].class);
|
|
|
+ String content = new String(body);
|
|
|
+ String topic = message.getStringProperty("topic");
|
|
|
+ String messageId = message.getStringProperty("messageId");
|
|
|
+ long generateTime = message.getLongProperty("generateTime");
|
|
|
+ log.info("消息内容:{}", content);
|
|
|
+ var payload = JSONObject.parseObject(content);
|
|
|
+ jsonObject.put("payload", payload);
|
|
|
+ jsonObject.put("messagetype", "upload");
|
|
|
+ jsonObject.put("topic", topic);
|
|
|
+ jsonObject.put("messageid", messageId);
|
|
|
+ jsonObject.put("timestamp", generateTime);
|
|
|
+
|
|
|
+ var event = payload.getString("event");
|
|
|
+ var eventHandler = AwoaraEventHandlerFactory.getEventHandler(event);
|
|
|
+ var msg = parseMessageBody(jsonObject.toJSONString(), Event.getClazz(event));
|
|
|
+ eventHandler.handle(msg);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("processMessage occurs error ", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static <T> MessageBody<T> parseMessageBody(String jsonStr, Class<T> clazz) {
|
|
|
+ var json = StringEscapeUtils.unescapeJava(jsonStr);
|
|
|
+ return JSONObject.parseObject(json, new TypeReference<>(clazz) {
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
|
|
|
+ SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
|
|
|
+ Mac mac = Mac.getInstance(signMethod);
|
|
|
+ mac.init(signingKey);
|
|
|
+ byte[] rawHmac = mac.doFinal(toSignString.getBytes());
|
|
|
+ return Base64.encodeBase64String(rawHmac);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void start() throws Exception {
|
|
|
+ for (int i = 0; i < connectionCount; i++) {
|
|
|
+ long timeStamp = System.currentTimeMillis();
|
|
|
+ String signMethod = "hmacsha1";
|
|
|
+
|
|
|
+ String userName = clientId + "-" + i + "|authMode=aksign"
|
|
|
+ + ",signMethod=" + signMethod
|
|
|
+ + ",timestamp=" + timeStamp
|
|
|
+ + ",authId=" + accessKey
|
|
|
+ + ",iotInstanceId=" + iotInstanceId
|
|
|
+ + ",consumerGroupId=" + consumerGroupId
|
|
|
+ + "|";
|
|
|
+
|
|
|
+ String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
|
|
|
+ String password = doSign(signContent, accessSecret, signMethod);
|
|
|
+ String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30";
|
|
|
+
|
|
|
+ Hashtable<String, String> hashtable = new Hashtable<>();
|
|
|
+ hashtable.put("connectionfactory.SBCF", connectionUrl);
|
|
|
+ hashtable.put("queue.QUEUE", "default");
|
|
|
+ hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
|
|
|
+ Context context = new InitialContext(hashtable);
|
|
|
+ ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
|
|
|
+ Destination queue = (Destination) context.lookup("QUEUE");
|
|
|
+
|
|
|
+ Connection connection = cf.createConnection(userName, password);
|
|
|
+ connections.add(connection);
|
|
|
+
|
|
|
+ ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
|
|
|
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
|
+ connection.start();
|
|
|
+
|
|
|
+ MessageConsumer consumer = session.createConsumer(queue);
|
|
|
+ consumer.setMessageListener(MESSAGE_LISTENER);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("amqp is started successfully");
|
|
|
+ }
|
|
|
+
|
|
|
+ @EventListener(classes = {ContextRefreshedEvent.class})
|
|
|
+ @Async
|
|
|
+ public void init() throws Exception {
|
|
|
+ start();
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void shutdown() {
|
|
|
+ connections.forEach(connection -> {
|
|
|
+ try {
|
|
|
+ connection.close();
|
|
|
+ } catch (JMSException e) {
|
|
|
+ log.error("Failed to close connection", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ executorService.shutdown();
|
|
|
+ try {
|
|
|
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
+ executorService.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ executorService.shutdownNow();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|