又到了
顯擺分享技術的時候了沒有看過上一篇文章的可以先看一下,這篇是在上一篇基礎上接著添加功能
SpringBoot2整合mqtt服務器EMQ實作訊息訂閱發布入庫(一)
這篇文章的流程為:
1.Springboot將訂閱的資料入庫
2.開發實時訂閱/發布展示頁面
這篇文章是整合mqtt服務器的訊息訂閱發布的最后一篇,在掌握這兩篇文章后可以搭建自己的mqtt業務處理服務器,強調一下,做這些的目的就是為了搭建自己的物聯網服務器,自己開發資料接入邏輯,自己開發資料處理邏輯,自己開發頁面展示,而不是直接依賴無腦的傻瓜式阿里云的物聯網整合方案,這篇文章就是為了自己搭建出相同功能的專案, 不要再問我能不能接入阿里云的資料處理展示服務了,如果沒看懂我寫的文章,我很抱歉,右上角有個叉,歡迎點擊
接著上一篇文章繼續寫一下訂閱的資料如何入庫
目錄
一.Springboot將訂閱的資料入庫
1.使用ApplicationContextAware
2.重寫PushCallback類
二.開發實時訂閱/發布展示頁面
1.頁面最終展示效果
2.實作 訂閱主題 列出已訂閱主題 取消訂閱 資料發布 功能
3.實時訂閱展示 使用WebSocket實作
三.總結
后記:
預告:
一.Springboot將訂閱的資料入庫
由于直接使用注解注入方法會拋出例外報空,我的理解就是因為spring的容器加載順序的原因,用于訂閱的PushCallback類實作的MqttCallback介面包括具體方法已經注入到spring的容器中,而@Autowired注解的入庫方法是后注入容器的結果導致實作MqttCallback介面的方法時讀取不到才拋出空指標
1.使用ApplicationContextAware
使用該類實體化后就可以手動獲取Bean的注入物件,首先創建一個類來實作這個介面,具體代碼如下
后臺SpringUtil 類 用于手動注入其他類,用于解決使用@Autowired注入報空指標問題
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringUtil.applicationContext == null){
SpringUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
這樣我們就通過實體化呼叫getBean()方法就可以在接收到訂閱資料后進行入庫邏輯
XXXServiceImpl XXX = SpringUtil.getBean(XXXServiceImpl.class);
需要修改PushCallback類,就可以呼叫無法注入的方法
2.重寫PushCallback類
由于解決了將入庫方法注入到容器中的問題,接下來就需要改寫PushCallback類的messageArrived方法
修改后的代碼如下
后臺PushCallback類
@Slf4j
@Component
public class PushCallback implements MqttCallback {
private MqttPushClient client;
private MqttConfiguration mqttConfiguration;
public PushCallback(MqttPushClient client ,MqttConfiguration mqttConfiguration) {
this.client = client;
this.mqttConfiguration = mqttConfiguration;
}
@Override
public void connectionLost(Throwable cause) {
/** 連接丟失后,一般在這里面進行重連 **/
if(client != null) {
while (true) {
try {
log.info("==============》》》[MQTT] 連接斷開,5S之后嘗試重連...");
Thread.sleep(5000);
MqttPushClient mqttPushClient = new MqttPushClient();
mqttPushClient.connect(mqttConfiguration);
if(MqttPushClient.getClient().isConnected()){
log.info("=============>>重連成功");
}
break;
} catch (Exception e) {
log.error("=============>>>[MQTT] 連接斷開,重連失敗!<<=============");
continue;
}
}
}
log.info(cause.getMessage());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后會執行到這里
log.info("publish后會執行到這里");
log.info("pushComplete==============>>>" + token.isComplete());
}
/**
* 監聽對應的主題訊息
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的訊息會執行到這里面
String Payload = new String(message.getPayload());
log.info("============》》接收訊息主題 : " + topic);
log.info("============》》接收訊息Qos : " + message.getQos());
log.info("============》》接收訊息內容 : " + Payload);
log.info("============》》接收ID : " + message.getId());
log.info("接收資料結束 下面可以執行資料處理操作");
//將json轉map,方便讀取資料
JSONObject json = JSONObject.parseObject(Payload);
Map<String,Object> MapJson = json.getInnerMap();
//實體化入庫方法 這里就用到SpringUtil類 來手動的注入
WeatherServiceImpl mqttDataService = SpringUtil.getBean(WeatherServiceImpl.class);
//呼叫入庫方法
mqttDataService.WeatherStorage(MapJson);
//與頁面實時通信使用,下面會講
WebSocketServiceImpl socketService = SpringUtil.getBean(WebSocketServiceImpl.class);
socketService.SendRealTimeData(topic , json.toString() );
}
}
其實就是在引入SpringUtil 類后直接呼叫入庫方法即可
接下來的入庫方法想必大家都會撰寫,這里就不在贅述了
到目前為止已經可以滿足一個中小型的物聯網服務器資料互動使用,資料入庫等操作已經可以滿足,不要在跟我講有沒有想過接入阿里云的物聯網了,你都阿里云的物聯網還看我文章干什么
二.開發實時訂閱/發布展示頁面
到現在為止一個擁有完整的mqtt資料發布訂閱入庫功能的物聯網服務器已經開發的差不多了,目前已經可以實作物聯網服務器的基礎功能,可能有的人會認為實作的功能太過簡單,對沒錯是很簡單,我說的是實作的技術很簡單,一個簡單的技識訓需要依賴別人給你提供解決方案嗎,作為開發人員當有了必要的基礎功能時,就可以根據自己的需求構思開發,開發自己的程式
那么接下來就是開發一個方便除錯的頁面將mqtt的發布訂閱功能整合到頁面中
功能清單:
1.訂閱主題
2.列出已訂閱主題
3.取消訂閱
4.資料發布
5.實時訂閱展示
1.頁面最終展示效果

2.實作 訂閱主題 列出已訂閱主題 取消訂閱 資料發布 功能
前四點都是上一篇文章里寫好了呼叫方法的,這里直接寫Controller方法,來呼叫這些方法
有一點需要注意,也是我當時偷懶和學習不足導致的,原本我的訂閱與取消訂閱用的是get方法接收通過url傳遞的引數,結果沒想到mqtt的主題不單單是一個名稱那么簡單
mqtt有分隔符與通配符
主題層級分隔符 "/"
單層通配符 "+"
多層通配符 "#"
常見的主題寫法為 : test1
但也有部分使用這些分隔符通配符就會出現 如 test1/names
結果導致get的url傳參收到影響,所以已經將方法改為post+json方式傳遞主題
后臺MqttController類
import com.alibaba.fastjson.JSONObject;
import com.zdr.ahairteeter.mod_MQTT.MQTTTOOL.MqttPushClient;
import com.zdr.ahairteeter.mod_MQTT.MQTTTOOL.MqttSender;
import com.zdr.ahairteeter.mod_Tools.Tool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpSession;
import java.util.Map;
/**
* ZDR
* 說明:
* 作用:
* 目的:
*
* @DATE: 2021/3/3
* @TIME: 12:26
*/
@Controller
@Slf4j
@ResponseBody
@RequestMapping("/mqtt")
public class MqttController {
//發送邏輯
@Autowired
private MqttSender mqttSender;
//訂閱邏輯
@Autowired
private MqttPushClient mqttPushClient;
/**
* 向指定主題發布資料
* @param typelist
* @param session
* @return
*/
@PostMapping("/sendmqtt")
public String sendmqttTexts(@RequestBody Map<String,String> typelist , HttpSession session){
if (whatJson(typelist.get("json"))){
log.info(" 本機主題:"+typelist.get("topic")+" 發送資料為:"+JSONObject.toJSONString(typelist.get("json")));
mqttSender.send(typelist.get("topic"), typelist.get("json"));
} else {
log.info("發送的資料非JSON格式");
return "非JSON格式發送失敗";
}
return "發送結束";
}
/**
* 訂閱主題
* @param
* @param session
* @return
*/
@RequestMapping("/getsubscribetopic")
public Object getsubscribetopic( @RequestBody Map<String,String> typelist , HttpSession session ){
int Qos=1;
String key = "";
if(typelist!=null && typelist.size() > 0){
key = typelist.get("topic");
log.info("訂閱主題為:"+key);
String[] topics={key};
int[] qos={Qos};
mqttPushClient.subscribe(topics,qos);
return key;
}else{
return "主題不能為空";
}
}
/**
* 取消訂閱
* @param
* @param session
* @return
*/
@RequestMapping("/getcallsubscribe")
public Object getcallsubscribe( @RequestBody Map<String,String> typelist , HttpSession session ){
String key = "";
if(typelist!=null && typelist.size() > 0){
key = typelist.get("topic");
mqttPushClient.cleanTopic(key);
}
return "";
}
/**
* 因為只是一個簡單demo就把方法放到這里了,不分包了
* 驗證字串是否嚴格按照JSON格式撰寫
* @param StrJson 待驗證的JSON串
* @return
*/
public boolean whatJson(String StrJson) {
if(StringUtils.isEmpty(StrJson)){
return false;
}
boolean isJsonObject = true;
boolean isJsonArray = true;
try {
com.alibaba.fastjson.JSONObject.parseObject(StrJson);
} catch (Exception e) {
isJsonObject = false;
}
try {
com.alibaba.fastjson.JSONObject.parseArray(StrJson);
} catch (Exception e) {
isJsonArray = false;
}
if(!isJsonObject && !isJsonArray){ //不是json格式
return false;
}
return true;
}
}
3.實時訂閱展示 使用WebSocket實作
但想要做到在前端頁面實時展示訂閱的資料就不太好實作
簡單的辦法是前端頁面定時查詢訂閱到的資料并展示在前端,能做到頁面加載資料,但做不到實時展示
有些資料的發送間隔會很短,就會出現定時的時間比發送間隔要長,很不推薦
這里使用WebSocket方法實作資料的實時加載
首先在pom.xml里添加依賴
組態檔pom.xml
<!--webSocket實時通信 socket依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
開發實時通信的程式
后臺WebSocketServiceImpl
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* ZDR
* 說明:即時通訊實作方法
* 作用:
* 目的:
*
* @DATE: 2021/3/12
* @TIME: 22:44
*/
@Service
public class WebSocketServiceImpl {
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* 與指定地址創建連接 , 當有需要實時加載到頁面的資料時呼叫該方法就會向指定地址發送資料
* /topic/mqttreal 要與前端定義的地址一致 , 就會接收到資料
*/
public void SendRealTimeData(String topic , String texts){
SimpleDateFormat DT7 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
String DateTime = "";
Date date = new Date();
DateTime = DT7.format(date);
messagingTemplate.convertAndSend("/topic/mqttreal", DateTime+" 主題 :"+topic+" 引數 : "+texts );
}
}
后臺WebSocketConfig類
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* ZDR
* 說明:websocket配置類
* 作用:
* 目的:
*
* @DATE: 2021/3/11
* @TIME: 15:31
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// 設定socket連接
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry.addEndpoint("/simple").withSockJS();
//.setAllowedOrigins("*") //解決跨域問題
}
// 設定發布訂閱的主題
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/top");
}
}
后臺的代碼就是這些,一切準別就緒就可以寫前端的頁面了,由于本人前端很垃圾,只用了簡單的html css與js,不會高大上的vue
HTML代碼
注意的是匯入css與js的地方要寫自己放檔案的地方,不要直接復制啟動一看沒有加載來問我為什么,我只知道我的沒有問題
html注意這里從外部引入了兩個js,是前端的WebSocket需要引入的
前端HTML檔案
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script th:src="@{/static/js/jquery-3.4.1.min.js}"></script>
<script th:src="@{/static/mod_MQTT/js/browsesubscribe.js}"></script>
<link rel="stylesheet" th:href="@{/static/mod_MQTT/css/browsesubscribe.css}"/>
</head>
<body>
<h2>MQTT訂閱瀏覽與取消訂閱</h2>
<div id='left_side_div'>
<input type="text" id="topicname" />
<button id="subscribebutt" onclick="subscribe();">訂閱主題</button>
<p id="subscribelist"></p>
<input type="text" id="calltopicname" />
<button id="callsubscribebutt" onclick="callsubscribe();">取消訂閱</button>
<br>
<br>
<span>主題</span><input type="text" id="topic" /><br>
<textarea rows="15" cols="40" type="text" id="testjson">編輯JSON資料</textarea><br>
<button id="sendmqtt" onclick="sendmqtt();">模擬mqtt協議發送資料</button>
</div>
<div id='right_side_div'>
<button id="eliminate" onclick="eliminate();">清屏</button>
<p id="callback"></p>
</div>
</body>
</html>
前端JS檔案
var stompClient = null;
$(function(){
connect();
$("#subscribelist").html("");
});
function eliminate(message) {
$("#callback").html("");
}
//實時通信 這里的/topic/mqttreal 就是后臺定義的實時通信地址
function connect() {
var socket = new SockJS("/simple");
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/mqttreal', function(frame){
showcallback(frame.body);
});
});
}
function showcallback(message) {
$("#callback").append("<tr><td>" + message + "</td></tr>");
}
function sendmqtt() {
var topic = $("#topic").val();
var testjson = $("#testjson").val();
var json = {
'topic':topic,
'json':testjson
}
$.ajax({
type:"POST",
url:"/mqtt/sendmqtt/",
contentType:"application/json", //發送資訊至服務器時內容編碼型別,
dataType:"json", // 預期服務器回傳的資料型別,如果不指定,jQuery 將自動根據 HTTP 包 MIME 資訊來智能判斷,比如XML MIME型別就被識別為XML,
data:JSON.stringify(json),
success:function(retdata){
}
});
}
function callsubscribe() {
$("#subscribelist").html("");
var topic = $("#calltopicname").val();
var json = {
'topic':topic
}
$.ajax({
type:"POST",
url:"/mqtt/getcallsubscribe",
contentType:"application/json", //發送資訊至服務器時內容編碼型別,
dataType:"json", // 預期服務器回傳的資料型別,如果不指定,jQuery 將自動根據 HTTP 包 MIME 資訊來智能判斷,比如XML MIME型別就被識別為XML,
data:JSON.stringify(json),
error: function(data){
appsubscribelist(data.responseText);
}
});
}
function subscribe() {
var topic = $("#topicname").val();
var json = {
'topic':topic
}
$.ajax({
type:"POST",
url:"/mqtt/getsubscribetopic",
contentType:"application/json", //發送資訊至服務器時內容編碼型別,
dataType:"json", // 預期服務器回傳的資料型別,如果不指定,jQuery 將自動根據 HTTP 包 MIME 資訊來智能判斷,比如XML MIME型別就被識別為XML,
data:JSON.stringify(json),
error: function(data){
appsubscribelist(data.responseText);
}
});
}
function appsubscribelist(message) {
$("#subscribelist").append("<div>" + message + "</div>");
}
css
#testjson{
font-size: 20px;
}
#left_side_div{
float:left;
display: inline-block;
width: 30%;
height: 500px;
}
#right_side_div{
float:left;
display: inline-block;
width: 50%;
height: 500px;
}
前端就是這樣了,所有的準備功能做已經做好了,可以啟動專案測驗一下,首先要將搭建的開源mqtt服務器啟動,其次啟動Springboot專案,在瀏覽器輸入專案地址以及頁面的路由地址
最后頁面就像

可以看右側實時訂閱展示,因為使用的是實時通信的方式接受服務器資料,時間都在毫秒級別,如果使用定時重繪肯定不能及時反饋
三.總結
mqtt物聯網服務器的搭建已經到這里就結束了,只要滿足資料入庫,接下來的功能就可以自己撰寫了,測驗的頁面也有了方便測驗物聯網設備除錯資料發送
我也休息休息,短時間內不會有新的單片機的開發想法,主要是沒什么好玩的點子,如果有有趣的點子可以分享一下,好玩的話我也會做一做玩玩
后記:
寫不下去了,好累啊,大好的周末時間竟然還在值班,我去,受不了了
給你看們看看我寫這段話的時間
跟人事提提,點給他,問問單位發不發物件,不發我就不干了跳槽,****的****的我要跳槽,真受不了**********,啊啊啊啊啊*******
************
就是這樣,文章寫得不錯,原始碼什么的如果有需要就先和我聊聊天,聊好了我會給原碼的,就不上傳浪費大家的積分了
還有想做伸手黨的注意了,以后再有上來就理直氣壯只要原碼不聊天的一律裝死不管
預告:
沒想到三月就寫差不多的文章拖到四月中旬才發布,有點懶了,下一篇文章我要顯擺一下我的爬蟲技術了
預告 : 使用爬蟲做到需要登錄的網站每日自動簽到
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/276683.html
標籤:其他
