有時候我們的專案中會用到即時通訊功能,比如電商系統中的客服聊天功能,還有在支付程序中,當用戶支付成功后,第三方支付服務會回呼我們的回呼介面,此時我們需要通知前端支付成功,最近發現RabbitMQ可以很方便的實作即時通訊功能,如果你沒有特殊的業務需求,甚至可以不寫后端代碼,今天給大家講講如何使用RabbitMQ來實作即時通訊!
MQTT協議
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基于發布/訂閱(publish/subscribe)模式的輕量級通訊協議,該協議構建于TCP/IP協議上,MQTT最大優點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的訊息服務,

MQTT相關概念
- Publisher(發布者):訊息的發出者,負責發送訊息,
- Subscriber(訂閱者):訊息的訂閱者,負責接收并處理訊息,
- Broker(代理):訊息代理,位于訊息發布者和訂閱者之間,各類支持MQTT協議的訊息中間件都可以充當,
- Topic(主題):可以理解為訊息佇列中的路由,訂閱者訂閱了主題之后,就可以收到發送到該主題的訊息,
- Payload(負載);可以理解為發送訊息的內容,
- QoS(訊息質量):全稱Quality of Service,即訊息的發送質量,主要有QoS 0、QoS 1、QoS 2三個等級,下面分別介紹下: QoS 0(Almost Once):至多一次,只發送一次,會發生訊息丟失或重復; QoS 1(Atleast Once):至少一次,確保訊息到達,但訊息重復可能會發生; QoS 2(Exactly Once):只有一次,確保訊息只到達一次,
RabbitMQ啟用MQTT功能
RabbitMQ啟用MQTT功能,需要先安裝然RabbitMQ然后再啟用MQTT插件,
- 首先我們需要安裝并啟動RabbitMQ,對RabbitMQ不了解的朋友可以參考《花了3天總結的RabbitMQ實用技巧,有點東西!》;
- 接下來就是啟用RabbitMQ的MQTT插件了,默認是不啟用的,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_mqtt
- 開啟成功后,查看管理控制臺,我們可以發現MQTT服務運行在1883埠上了,

MQTT客戶端
我們可以使用MQTT客戶端來測驗MQTT的即時通訊功能,這里使用的是MQTTBox這個客戶端工具,
- 首先下載并安裝好MQTTBox,下載地址:http://workswithweb.com/mqttbox.html

- 點擊Create MQTT Client按鈕來創建一個MQTT客戶端;

- 接下來對MQTT客戶端進行配置,主要是配置好協議埠、連接用戶名密碼和QoS即可;

- 再配置一個訂閱者,訂閱者訂閱testTopicA這個主題,我們會向這個主題發送訊息;

- 發布者向主題中發布訊息,訂閱者可以實時接收到,

前端直接實作即時通訊
既然MQTTBox客戶端可以直接通過RabbitMQ實作即時通訊,那我們是不是直接使用前端技術也可以實作即時通訊?答案是肯定的!下面我們將通過html+javascript實作一個簡單的聊天功能,真正不寫一行后端代碼實作即時通訊!
- 由于RabbitMQ與Web端互動底層使用的是WebSocket,所以我們需要開啟RabbitMQ的MQTT WEB支持,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
- 開啟成功后,查看管理控制臺,我們可以發現MQTT的WEB服務運行在15675埠上了;

- WEB端與MQTT服務進行通訊需要使用一個叫MQTT.js的庫,專案地址:https://github.com/mqttjs/MQTT.js

- 實作的功能非常簡單,一個單聊功能,需要注意的是配置好MQTT服務的訪問地址為:ws://localhost:15675/ws
Title
目標Topic:發送訊息:發送 清空
``
- 接下來我們訂閱不同的主題開啟兩個頁面測驗下功能(頁面放在了SpringBoot應用的resource目錄下了,需要先啟動應用再訪問): 第一個訂閱主題testTopicA,訪問地址:http://localhost:8088/page/index?topic=testTopicA 第二個訂閱主題testTopicB,訪問地址:http://localhost:8088/page/index?topic=testTopicB
- 之后互相發送訊息,讓我們來看看效果吧!

在SpringBoot中使用
沒有特殊業務需求的時候,前端可以直接和RabbitMQ對接實作即時通訊,但是有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了,接下來我們來講講如何在SpringBoot應用中使用MQTT,
- 首先我們需要在pom.xml中添加MQTT相關依賴;
org.springframework.integration spring-integration-mqtt
- 在application.yml中添加MQTT相關配置,主要是訪問地址、用戶名密碼、默認主題資訊;
rabbitmq: mqtt: url: tcp://localhost:1883 username: guest password: guest defaultTopic: testTopic
- 撰寫一個Java配置類從組態檔中讀取配置便于使用;
/** * MQTT相關配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig { /** * RabbitMQ連接用戶名 */ private String username; /** * RabbitMQ連接密碼 */ private String password; /** * RabbitMQ的MQTT默認topic */ private String defaultTopic; /** * RabbitMQ的MQTT連接地址 */ private String url;}
- 添加MQTT訊息訂閱者相關配置,使用@ServiceActivator注解宣告一個服務激活器,通過MessageHandler來處理訂閱訊息;
/** * MQTT訊息訂閱者相關配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //設定訊息質量:0->至多一次;1->至少一次;2->只有一次 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { //處理訂閱訊息 log.info("handleMessage : {}",message.getPayload()); } }; }}
- 添加MQTT訊息發布者相關配置;
/** * MQTT訊息發布者相關配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttConfig.getUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisherClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }}
- 添加MQTT網關,用于向主題中發送訊息;
/** * MQTT網關,通過介面將資料傳遞到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { /** * 發送訊息到默認topic */ void sendToMqtt(String payload); /** * 發送訊息到指定topic */ void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); /** * 發送訊息到指定topic并設定QOS */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
- 添加MQTT測驗介面,使用MQTT網關向特定主題中發送訊息;
/** * MQTT測驗介面 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT測驗介面")@RestController@RequestMapping("/mqtt")public class MqttController { @Autowired private MqttGateway mqttGateway; @PostMapping("/sendToDefaultTopic") @ApiOperation("向默認主題發送訊息") public CommonResult sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload); return CommonResult.success(null); } @PostMapping("/sendToTopic") @ApiOperation("向指定主題發送訊息") public CommonResult sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic); return CommonResult.success(null); }}
- 呼叫介面向主題中發送訊息進行測驗;

- 后臺成功接收到訊息并進行列印,
2020-09-17 14:29:01.689 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的訊息2020-09-17 14:29:06.101 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的訊息2020-09-17 14:29:07.384 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的訊息
總結
訊息中間件應用越來越廣泛,不僅可以實作可靠的異步通信,還可以實作即時通訊,掌握一個訊息中間件還是很有必要的,如果沒有特殊業務需求,客戶端或者前端直接使用MQTT對接訊息中間件即可實作即時通訊,有特殊需求的時候也可以使用SpringBoot集成MQTT的方式來實作,總之訊息中間件是實作即時通訊的一個好選擇!
關注公眾號:java寶典
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/265557.html
標籤:其他
上一篇:Python Tkinter 視窗的管理與設定(五):三種標準對話框模塊
下一篇:例外處理(面試題)

