AmqpHandler.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package com.kym.service.mq;
  2. import com.kym.service.aliyun.lot.AliyunLotConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.codec.binary.Base64;
  5. import org.apache.qpid.jms.JmsConnection;
  6. import org.apache.qpid.jms.JmsConnectionListener;
  7. import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
  8. import org.springframework.beans.factory.DisposableBean;
  9. import org.springframework.context.event.ContextRefreshedEvent;
  10. import org.springframework.context.event.EventListener;
  11. import org.springframework.scheduling.annotation.Async;
  12. import javax.crypto.Mac;
  13. import javax.crypto.spec.SecretKeySpec;
  14. import javax.jms.*;
  15. import javax.naming.Context;
  16. import javax.naming.InitialContext;
  17. import java.net.URI;
  18. import java.util.ArrayList;
  19. import java.util.Hashtable;
  20. import java.util.List;
  21. import java.util.concurrent.ExecutorService;
  22. import java.util.concurrent.LinkedBlockingQueue;
  23. import java.util.concurrent.ThreadPoolExecutor;
  24. import java.util.concurrent.TimeUnit;
  25. /**
  26. * 阿里云LOT AMQP消息处理(石斑鱼主板上报消息)
  27. *
  28. * @author skyline
  29. */
  30. //@Component
  31. @Slf4j
  32. public class AmqpHandler implements DisposableBean {
  33. /**
  34. * 线程池
  35. */
  36. private final static ExecutorService executorService = new ThreadPoolExecutor(
  37. Runtime.getRuntime().availableProcessors(),
  38. Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
  39. new LinkedBlockingQueue<>(50000));
  40. /**
  41. * AMQP连接
  42. */
  43. static List<Connection> connections = new ArrayList<>();
  44. // /**
  45. // * 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
  46. // */
  47. // private static String accessKey = "LTAI5tNhD2KFuLUN1hMEukmS";
  48. // ;
  49. // private static String accessSecret = "dtVF6na8Hp9W8DmAoWI9k24VXwjNyM";
  50. // private static String consumerGroupId = "DEFAULT_GROUP";
  51. // //iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。
  52. // private static String iotInstanceId = "iot-06z00hb4ys0z7ri";
  53. // //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
  54. // //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
  55. // private static String clientId = "car-wash-1";
  56. // //${YourHost}为接入域名,请参见AMQP客户端接入说明文档。
  57. // //${uid}.iot-amqp.${YourRegionId}.aliyuncs.com 对于Java、.NET、Python 2.7、Node.js、Go客户端:端口号为5671
  58. // // 公共实例endpoint:iot-06z00hb4ys0z7ri.amqp.iothub.aliyuncs.com
  59. // private static String host = "iot-06z00hb4ys0z7ri.amqp.iothub.aliyuncs.com";
  60. // 指定单个进程启动的连接数
  61. // 单个连接消费速率有限,请参考使用限制,最大64个连接
  62. // 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接
  63. private static int connectionCount = 4;
  64. private static MessageListener messageListener = new MessageListener() {
  65. @Override
  66. public void onMessage(final Message message) {
  67. try {
  68. // 1.收到消息之后一定要ACK。
  69. // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
  70. // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
  71. // message.acknowledge();
  72. // 2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
  73. // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
  74. executorService.submit(() -> processMessage(message));
  75. } catch (Exception e) {
  76. log.error("submit task occurs exception ", e);
  77. }
  78. }
  79. };
  80. private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
  81. /**
  82. * 连接成功建立。
  83. */
  84. @Override
  85. public void onConnectionEstablished(URI remoteURI) {
  86. log.info("onConnectionEstablished, remoteUri:{}", remoteURI);
  87. }
  88. /**
  89. * 尝试过最大重试次数之后,最终连接失败。
  90. */
  91. @Override
  92. public void onConnectionFailure(Throwable error) {
  93. log.error("onConnectionFailure, {}", error.getMessage());
  94. }
  95. /**
  96. * 连接中断。
  97. */
  98. @Override
  99. public void onConnectionInterrupted(URI remoteURI) {
  100. log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
  101. }
  102. /**
  103. * 连接中断后又自动重连上。
  104. */
  105. @Override
  106. public void onConnectionRestored(URI remoteURI) {
  107. log.info("onConnectionRestored, remoteUri:{}", remoteURI);
  108. }
  109. @Override
  110. public void onInboundMessage(JmsInboundMessageDispatch envelope) {
  111. }
  112. @Override
  113. public void onSessionClosed(Session session, Throwable cause) {
  114. }
  115. @Override
  116. public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
  117. }
  118. @Override
  119. public void onProducerClosed(MessageProducer producer, Throwable cause) {
  120. }
  121. };
  122. private final AliyunLotConfig aliyunLotConfig;
  123. public AmqpHandler(AliyunLotConfig aliyunLotConfig) {
  124. this.aliyunLotConfig = aliyunLotConfig;
  125. }
  126. /**
  127. * 订阅消息处理
  128. *
  129. * @throws Exception
  130. */
  131. private void doSubscribe() throws Exception {
  132. for (int i = 0; i < connectionCount; i++) {
  133. //参数说明,请参见AMQP客户端接入说明文档。
  134. long timeStamp = System.currentTimeMillis();
  135. //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
  136. String signMethod = "hmacsha1";
  137. //userName组装方法,请参见AMQP客户端接入说明文档。
  138. String userName = aliyunLotConfig.getClientId() + "-" + i + "|authMode=aksign"
  139. + ",signMethod=" + signMethod
  140. + ",timestamp=" + timeStamp
  141. + ",authId=" + aliyunLotConfig.getAccessKey()
  142. + ",iotInstanceId=" + aliyunLotConfig.getIotInstanceId()
  143. + ",consumerGroupId=" + aliyunLotConfig.getConsumerGroupId()
  144. + "|";
  145. //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
  146. String signContent = "authId=" + aliyunLotConfig.getAccessKey() + "&timestamp=" + timeStamp;
  147. String password = doSign(signContent, aliyunLotConfig.getAccessSecret(), signMethod);
  148. String connectionUrl = "failover:(amqps://" + aliyunLotConfig.getAmpqHost() + ":5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30";
  149. Hashtable<String, String> hashtable = new Hashtable<>();
  150. hashtable.put("connectionfactory.SBCF", connectionUrl);
  151. hashtable.put("queue.QUEUE", "default");
  152. hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
  153. Context context = new InitialContext(hashtable);
  154. ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
  155. Destination queue = (Destination) context.lookup("QUEUE");
  156. // 创建连接。
  157. Connection connection = cf.createConnection(userName, password);
  158. connections.add(connection);
  159. ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
  160. // 创建会话。
  161. // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
  162. // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
  163. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  164. connection.start();
  165. // 创建Receiver连接。
  166. MessageConsumer consumer = session.createConsumer(queue);
  167. consumer.setMessageListener(messageListener);
  168. }
  169. }
  170. /**
  171. * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
  172. */
  173. private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
  174. SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
  175. Mac mac = Mac.getInstance(signMethod);
  176. mac.init(signingKey);
  177. byte[] rawHmac = mac.doFinal(toSignString.getBytes());
  178. return Base64.encodeBase64String(rawHmac);
  179. }
  180. /**
  181. * 在这里处理您收到消息后的具体业务逻辑。
  182. */
  183. private static void processMessage(Message message) {
  184. try {
  185. byte[] body = message.getBody(byte[].class);
  186. String content = new String(body);
  187. String topic = message.getStringProperty("topic");
  188. String messageId = message.getStringProperty("messageId");
  189. long generateTime = message.getLongProperty("generateTime");
  190. log.info("receive message"
  191. + ",\n topic = " + topic
  192. + ",\n messageId = " + messageId
  193. + ",\n generateTime = " + generateTime
  194. + ",\n content = " + content);
  195. } catch (Exception e) {
  196. log.error("processMessage occurs error ", e);
  197. }
  198. }
  199. // 这里不能使用@PostConstruct,在初始化完成后, bean 进入增强阶段, 所以这个阶段的任何AOP都是无效的,https://www.cnblogs.com/eternityz/p/15330069.html
  200. @EventListener(classes = {ContextRefreshedEvent.class})
  201. @Async
  202. public void init() {
  203. // 开启线程处理队列消息
  204. handleMessage();
  205. }
  206. private void handleMessage() {
  207. executorService.execute(() -> {
  208. try {
  209. doSubscribe();
  210. } catch (Exception e) {
  211. throw new RuntimeException(e);
  212. }
  213. });
  214. if (!executorService.isTerminated()) {
  215. try {
  216. Thread.sleep(100);
  217. } catch (InterruptedException e) {
  218. log.error("processing interrupted.", e);
  219. }
  220. }
  221. }
  222. /**
  223. * 关闭AMQP连接
  224. *
  225. * @throws Exception
  226. */
  227. @Override
  228. public void destroy() throws Exception {
  229. // 结束程序运行
  230. log.info("run shutdown");
  231. connections.forEach(c -> {
  232. try {
  233. c.close();
  234. } catch (JMSException e) {
  235. log.error("failed to close connection", e);
  236. }
  237. });
  238. executorService.shutdown();
  239. if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
  240. log.info("shutdown success");
  241. } else {
  242. log.info("failed to handle messages");
  243. }
  244. }
  245. }