專案全部代碼地址:https://github.com/Tom-shushu/work-study.git (mqtt-emqt 專案)
先看我們最后實作的一個效果
1.手機端向主題 topic111 發送訊息,并接收,(手機測驗工具名稱:MQTT除錯器)


2.控制臺列印

MQTT基本簡介
MQTT 是用于物聯網 (IoT) 的 OASIS 標準訊息傳遞協議,它被設計為一種極其輕量級的發布/訂閱訊息傳輸,非常適合連接具有小代碼足跡和最小網路帶寬的遠程設備,
MQTT協議簡介
MQTT 是客戶端服務器發布/訂閱訊息傳輸協議,它重量輕、開放、簡單,并且易于實施,這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通信,其中需要小代碼足跡和/或網路帶寬非常寶貴,
該協議通過 TCP/IP 或其他提供有序、無損、雙向連接的網路協議運行,其特點包括:
· 使用發布/訂閱訊息模式,提供一對多的訊息分發和應用程式的解耦,
· 與有效負載內容無關的訊息傳輸,
· 訊息傳遞的三種服務質量:
o “最多一次”,根據操作環境的最大努力傳遞訊息,可能會發生訊息丟失,例如,此級別可用于環境傳感器資料,其中單個讀數是否丟失并不重要,因為下一個讀數將很快發布,
o “至少一次”,保證訊息到達但可能出現重復,
o “Exactly once”,保證訊息只到達一次,例如,此級別可用于重復或丟失訊息可能導致應用不正確費用的計費系統,
· 最小化傳輸開銷和協議交換以減少網路流量,
· 發生例外斷開時通知相關方的機制,
EMQX簡介
通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連接任何設備,使用 EMQX Enterprise 集群輕松擴展到數千萬并發 MQTT 連接,
并且EMQX還是開源的,又支持集群,所以還是一個比較不錯的選擇
EMQX集群搭建
前期準備:
1.兩臺服務器:我的兩個服務器一臺是騰訊云、一臺是阿里云的(不要問為什么,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和
mqtt_service_txyun 吧, 2.一個域名: mqtt.zhouhong.icu安裝開始
1.分別在兩臺服務器上執行以下操作進行安裝(如果是單機:只需要進行下面1、2操作就安裝完成了)
## 1.下載 wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 2.安裝 sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 3.修改組態檔 vim /etc/emqx/emqx.conf ## 4.修改以下內容 ## 注意node.name是當前這臺服務器名稱 node.name = [email protected] cluster.static.seeds = [email protected],[email protected] cluster.discovery = static cluster.name = my-mqtt-cluster
2.分別啟動兩臺服務器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 查看(隨便一臺都可以,默認賬號admin 密碼public),注意打開18083,1883 安全組

4.nginx負載均衡
nginx搭建很簡單略過,大家只需要修改以下nginx.conf里面的內容即可
stream { upstream mqtt.zhouhong.icu { zone tcp_servers 64k; hash $remote_addr; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; } server { listen 8883 ssl; status_zone tcp_server; proxy_pass mqtt.zhouhong.icu; proxy_buffer_size 4k; ssl_handshake_timeout 15s; ssl_certificate /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem; ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key; } }
與SpringBoot集成并實作服務器端監控對應topic下的訊息
1.專案搭建
-
引入MQTT相關jar包
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
-
yml組態檔 (如果大家沒搭建好的話,可以直接使用我搭建的這個)
server: port: 8080 mqtt:
## 單機版--只需要把域名改為ip既可 hostUrl: tcp://mqtt.zhouhong.icu:1883 username: admin password: public ## 服務端 clientId (發送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
-
屬性配置
/** * description: * date: 2022/6/16 15:51 * @author: zhouhong */ @Component @ConfigurationProperties("mqtt") @Data public class MqttProperties { /** * 用戶名 */ private String username; /** * 密碼 */ private String password; /** * 連接地址 */ private String hostUrl; /** * 客戶端Id,同一臺服務器下,不允許出現重復的客戶端id */ private String clientId; /** * 默認連接主題 */ private String topic; /** * 超時時間 */ private int timeout; /** * 設定會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端 * 發送個訊息判斷客戶端是否在線,但這個方法并沒有重連的機制 */ private int keepAlive; /** * 設定是否清空session,這里如果設定為false表示服務器會保留客戶端的連 * 接記錄,這里設定為true表示每次連接到服務器都以新的身份連接 */ private Boolean cleanSession; /** * 是否斷線重連 */ private Boolean reconnect; /** * 連接方式 */ private Integer qos; }
-
發送訊息回呼
/** * description: 發生訊息成功后 的 回呼 * date: 2022/6/16 15:55 * * @author: zhouhong */ @Component @Log4j2 public class MqttSendCallBack implements MqttCallbackExtended { /** * 客戶端斷開后觸發 * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("發送訊息回呼: 連接斷開,可以做重連"); } /** * 客戶端收到訊息觸發 * * @param topic 主題 * @param mqttMessage 訊息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("發送訊息回呼: 接收訊息主題 : " + topic); log.info("發送訊息回呼: 接收訊息內容 : " + new String(mqttMessage.getPayload())); } /** * 發布訊息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("發送訊息回呼: 向主題:" + topic + "發送訊息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("發送訊息回呼: 訊息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq服務器后觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------"); } }
-
接收訊息回呼
/** * description: 接收訊息后的回呼 * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptCallback implements MqttCallbackExtended { @Resource private MqttAcceptClient mqttAcceptClient; /** * 客戶端斷開后觸發 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("接收訊息回呼: 連接斷開,可以做重連"); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { log.info("接收訊息回呼: emqx重新連接...................................................."); mqttAcceptClient.reconnection(); } } /** * 客戶端收到訊息觸發 * * @param topic 主題 * @param mqttMessage 訊息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收訊息回呼: 接收訊息主題 : " + topic); log.info("接收訊息回呼: 接收訊息內容 : " + new String(mqttMessage.getPayload())); } /** * 發布訊息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("接收訊息回呼: 向主題:" + topic + "發送訊息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("接收訊息回呼: 訊息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq服務器后觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------"); // 以/#結尾表示訂閱所有以test開頭的主題 // 訂閱所有機構主題 mqttAcceptClient.subscribe("topic111", 0); } }
-
發訊息
/** * description: 發送訊息 * date: 2022/6/16 16:01 * * @author: zhouhong */ @Component public class MqttSendClient { @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect() { MqttClient client = null; try { String uuid = UUID.randomUUID().toString().replaceAll("-",""); client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true); options.setAutomaticReconnect(false); try { // 設定回呼 client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } return client; } /** * 發布訊息 * 主題格式: server:report:$orgCode(引數實際使用機構代碼) * * @param retained 是否保留 * @param pushMessage 訊息體 */ public void publish(boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttClient mqttClient = connect(); try { mqttClient.publish(topic, message); } catch (MqttException e) { e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } /** * 關閉連接 * * @param mqttClient */ public static void disconnect(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.disconnect(); } } catch (MqttException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param mqttClient */ public static void close(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.close(); } } catch (MqttException e) { e.printStackTrace(); } } }
-
接收訊息
/** * description: 服務器段端連接訂閱訊息、監控topic * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptClient { @Autowired @Lazy private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 客戶端連接 */ public void connect() { MqttClient client; try { // clientId 使用服務器 yml里面配置的 clientId client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); try { // 設定回呼 client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 重新連接 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題 * * @param topic 主題 * @param qos 連接方式 */ public void subscribe(String topic, int qos) { log.info("==============開始訂閱主題==============" + topic); try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消訂閱某個主題 * * @param topic */ public void unsubscribe(String topic) { log.info("==============開始取消訂閱主題==============" + topic); try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } }
-
服務端啟動時連接訂閱主題并監控
/** * description: 啟動后連接 MQTT 服務器, 監聽 mqtt/my_topic 這個topic發送的訊息 * date: 2022/6/16 15:57 * @author: zhouhong */ @Configuration public class MqttConfig { @Resource private MqttAcceptClient mqttAcceptClient; @Bean public MqttAcceptClient getMqttPushClient() { mqttAcceptClient.connect(); return mqttAcceptClient; } }
-
發訊息控制類
/** * description: 發訊息控制類 * date: 2022/6/16 15:58 * * @author: zhouhong */ @RestController public class SendController { @Resource private MqttSendClient mqttSendClient; @PostMapping("/mqtt/sendmessage") public void sendMessage(@RequestBody SendParam sendParam) { mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent()); } }
2.測驗
- postman呼叫發訊息介面

- 控制臺日志

- 使用另外一個移動端MQTT除錯工具測驗
- 手機端向主題 topic111 發送訊息,并接收,


2. 控制臺列印

本文來自博客園,作者:Tom-shushu,轉載請注明原文鏈接:https://www.cnblogs.com/Tom-shushu/p/16390187.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/492456.html
標籤:Java
上一篇:檔案輸入輸出處理(二)-位元組流
下一篇:檔案輸入輸出處理(三)-字符流
