Eclipse Paho Java Client 是用 Java 撰寫的 MQTT 客戶端庫(MQTT Java Client),可用于 JVM 或其他 Java 兼容平臺(例如Android);本文主要介紹使用如何使用它來操作 EMQX,文中所使用到的軟體版本:EMQX 4.2.2、Paho 1.2.5、Java 1.8.0_321,
1、引入依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
2、同步方式收發訊息
2.1、發送訊息
public static void publish() { try { MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); /*會話清除標識 * false:服務端必須使用與 Client ID 關聯的會話來恢復與客戶端的通信,如果不存在這樣的會話,服務器必須創建一個新會話,客戶端和服務器在斷開連接后必須存盤會話的狀態, * true:客戶端和服務器必須丟棄任何先前的會話并創建一個新的會話,該會話的生命周期將和網路連接保持一致,其會話狀態一定不能被之后的任何會話重用, */ mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttConnectOptions.setKeepAliveInterval(10); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.info("發送完成:{}", token.isComplete()); } }); mqttClient.connect(mqttConnectOptions); for (int i = 0; i < 10; i++) { MqttMessage mqttMessage = new MqttMessage(("測驗訊息" + (i + 1)).getBytes()); mqttMessage.setQos(2); //是否保留訊息,只能保留最新的一份 mqttMessage.setRetained(false); mqttClient.publish(TOPIC_NAME, mqttMessage); } mqttClient.disconnect(); mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } }
2.2、接受訊息
public static void subscribe(String clientId) { try { MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); //設為 false, 該訂閱將被視為持久訂閱 mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { logger.info("接受到訊息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); mqttClient.connect(mqttConnectOptions); mqttClient.subscribe(TOPIC_NAME); } catch (Exception e) { e.printStackTrace(); } }
2.3、完整例子
package com.abc.demo.emqx; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MqttClientCase { private static Logger logger = LoggerFactory.getLogger(MqttClientCase.class.getName()); private static final String SERVER_URI = "tcp://10.49.196.10:1883"; private static final String TOPIC_NAME = "test-topic"; public static void main(String[] args) throws Exception { new Thread(() -> subscribe("client-subscribe-A")).start(); new Thread(() -> subscribe("client-subscribe-B")).start(); Thread.sleep(1000); new Thread(() -> publish()).start(); Thread.sleep(1000 * 60); } public static void publish() { try { MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); /*會話清除標識 * false:服務端必須使用與 Client ID 關聯的會話來恢復與客戶端的通信,如果不存在這樣的會話,服務器必須創建一個新會話,客戶端和服務器在斷開連接后必須存盤會話的狀態, * true:客戶端和服務器必須丟棄任何先前的會話并創建一個新的會話,該會話的生命周期將和網路連接保持一致,其會話狀態一定不能被之后的任何會話重用, */ mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttConnectOptions.setKeepAliveInterval(10); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.info("發送完成:{}", token.isComplete()); } }); mqttClient.connect(mqttConnectOptions); for (int i = 0; i < 10; i++) { MqttMessage mqttMessage = new MqttMessage(("測驗訊息" + (i + 1)).getBytes()); mqttMessage.setQos(2); //是否保留訊息,只能保留最新的一份 mqttMessage.setRetained(false); mqttClient.publish(TOPIC_NAME, mqttMessage); } mqttClient.disconnect(); mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } } public static void subscribe(String clientId) { try { MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); //設為 false, 該訂閱將被視為持久訂閱 mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { logger.info("接受到訊息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); mqttClient.connect(mqttConnectOptions); mqttClient.subscribe(TOPIC_NAME); } catch (Exception e) { e.printStackTrace(); } } }MqttClientCase.java
3、異步方式收發訊息
3.1、發送訊息
public static void publish() { try { MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "client-publish", new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); /*會話清除標識 * false:服務端必須使用與 Client ID 關聯的會話來恢復與客戶端的通信,如果不存在這樣的會話,服務器必須創建一個新會話,客戶端和服務器在斷開連接后必須存盤會話的狀態, * true:客戶端和服務器必須丟棄任何先前的會話并創建一個新的會話,該會話的生命周期將和網路連接保持一致,其會話狀態一定不能被之后的任何會話重用, */ mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttConnectOptions.setKeepAliveInterval(10); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.info("發送完成:{}", token.isComplete()); } }); IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions); iMqttToken.waitForCompletion(); for (int i = 0; i < 10; i++) { MqttMessage mqttMessage = new MqttMessage(("測驗訊息" + (i + 1)).getBytes()); mqttMessage.setQos(2); //是否保留訊息,只能保留最新的一份 mqttMessage.setRetained(false); IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage); iMqttDeliveryToken.waitForCompletion(); } mqttClient.disconnect().waitForCompletion(); mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } }
3.2、接受訊息
public static void subscribe(String clientId) { try { MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); //設為 false, 該訂閱將被視為持久訂閱 mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { logger.info("接受到訊息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions); iMqttToken.waitForCompletion(); mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion(); } catch (Exception e) { e.printStackTrace(); } }
3.2、完整例子
package com.abc.demo.emqx; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MqttAsyncClientCase { private static Logger logger = LoggerFactory.getLogger(MqttAsyncClientCase.class.getName()); private static final String SERVER_URI = "tcp://10.49.196.10:1883"; private static final String TOPIC_NAME = "test-topic"; public static void main(String[] args) throws Exception { new Thread(() -> subscribe("client-subscribe-A")).start(); new Thread(() -> subscribe("client-subscribe-B")).start(); Thread.sleep(1000); new Thread(() -> publish()).start(); Thread.sleep(1000 * 60); } public static void publish() { try { MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "test-client-publish", new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); /*會話清除標識 * false:服務端必須使用與 Client ID 關聯的會話來恢復與客戶端的通信,如果不存在這樣的會話,服務器必須創建一個新會話,客戶端和服務器在斷開連接后必須存盤會話的狀態, * true:客戶端和服務器必須丟棄任何先前的會話并創建一個新的會話,該會話的生命周期將和網路連接保持一致,其會話狀態一定不能被之后的任何會話重用, */ mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttConnectOptions.setKeepAliveInterval(10); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.info("發送完成:{}", token.isComplete()); } }); IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions); iMqttToken.waitForCompletion(); for (int i = 0; i < 10; i++) { MqttMessage mqttMessage = new MqttMessage(("測驗訊息" + (i + 1)).getBytes()); mqttMessage.setQos(2); //是否保留訊息,只能保留最新的一份 mqttMessage.setRetained(false); IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage); iMqttDeliveryToken.waitForCompletion(); } mqttClient.disconnect().waitForCompletion(); mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } } public static void subscribe(String clientId) { try { MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); //設為 false, 該訂閱將被視為持久訂閱 mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName("admin"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.info("連接斷開:{}", cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { logger.info("接受到訊息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions); iMqttToken.waitForCompletion(); mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion(); } catch (Exception e) { e.printStackTrace(); } } }MqttAsyncClientCase.java
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/492419.html
標籤:Java
