基于MQTT協議實作微信小程式控制樹莓派
在我的github上有原始碼,大家可以直接下載來用
https://github.com/yjc-123/-MQTT- ,這里給大家說一下實作的程序,
小程式端:
需要將我的github檔案引入這三個檔案

這是github上的原始碼,具體網站https://github.com/mqttjs/MQTT.js,我們需要講上述檔案引入才可已使用mqtt,
- 原始碼
你只需要修改opt的資訊就行了,test是你在EMQ創建的用戶和密碼,另外就是你服務器的域名,這里需要ssl,所以你在這個程序中需要講域名安裝證書,
import mqtt from '../../utils/mqtt.js';
//連接的服務器域名,注意格式!!!
const host = 'wxs://www.iot-yjc.cn/mqtt';
Page({
data: {
client: null,
//記錄重連的次數
reconnectCounts: 0,
//MQTT連接的配置
options: {
protocolVersion: 4, //MQTT連接協議版本c
clientId: 'wx_' + parseInt(Math.random() * 100 + 800, 10),
clean: false,
password: 'test',
username: 'test',
reconnectPeriod: 1000, //1000毫秒,兩次重新連接之間的間隔
connectTimeout: 30 * 1000, //1000毫秒,兩次重新連接之間的間隔
resubscribe: true //如果連接斷開并重新連接,則會再次自動訂閱已訂閱的主題(默認true)
}
},
onClick_connect: function() {
var that = this;
//開始連接
this.data.client = mqtt.connect(host, this.data.options);
this.data.client.on('connect', function(connack) {
wx.showToast({
title: '連接成功'
})
})
//服務器下發訊息的回呼
that.data.client.on("message", function(topic, payload) {
console.log(" 收到 topic:" + topic + " , payload :" + payload)
wx.showModal({
content: " 收到topic:[" + topic + "], payload :[" + payload + "]",
showCancel: false,
});
})
//服務器連接例外的回呼
that.data.client.on("error", function(error) {
console.log(" 服務器 error 的回呼" + error)
})
//服務器重連連接例外的回呼
that.data.client.on("reconnect", function() {
console.log(" 服務器 reconnect的回呼")
})
//服務器連接例外的回呼
that.data.client.on("offline", function(errr) {
console.log(" 服務器offline的回呼")
})
},
onClick_SubOne: function() {
if (this.data.client && this.data.client.connected) {
//僅訂閱單個主題
this.data.client.subscribe('Topic0', function(err, granted) {
if (!err) {
wx.showToast({
title: '訂閱主題成功'
})
} else {
wx.showToast({
title: '訂閱主題失敗',
icon: 'fail',
duration: 2000
})
}
})
} else {
wx.showToast({
title: '請先連接服務器',
icon: 'none',
duration: 2000
})
}
},
onClick_SubMany: function() {
if (this.data.client && this.data.client.connected) {
//僅訂閱多個主題
this.data.client.subscribe({
'Topic1': {
qos: 0
},
'Topic2': {
qos: 1
}
}, function(err, granted) {
if (!err) {
wx.showToast({
title: '訂閱多主題成功'
})
} else {
wx.showToast({
title: '訂閱多主題失敗',
icon: 'fail',
duration: 2000
})
}
})
} else {
wx.showToast({
title: '請先連接服務器',
icon: 'none',
duration: 2000
})
}
},
//發布訊息
listenerSwitch: function(e) {
if (this.data.client && this.data.client.connected){
if( e.detail.value == true){
this.data.client.publish('test', '打開');
wx.showToast({
title: '發布成功'
})
}
else if(e.detail.value == false){
this.data.client.publish('test', '關閉');
wx.showToast({
title: '發布成功'
})
}
else{}
}
console.log('switch型別開關當前狀態-----', e.detail.value);},
onClick_PubMsg: function() {
if (this.data.client && this.data.client.connected) {
this.data.client.publish('test', '打開');
wx.showToast({
title: '發布成功'
})
} else {
wx.showToast({
title: '請先連接服務器',
icon: 'none',
duration: 2000
})
}
},
onClick_unSubOne: function() {
if (this.data.client && this.data.client.connected) {
this.data.client.unsubscribe('Topic1');
} else {
wx.showToast({
title: '請先連接服務器',
icon: 'none',
duration: 2000
})
}
},
onClick_unSubMany: function() {
if (this.data.client && this.data.client.connected) {
this.data.client.unsubscribe(['Topic1', 'Topic2']);
} else {
wx.showToast({
title: '請先連接服務器',
icon: 'none',
duration: 2000
})
}
},
onl oad: function() {
wx.setNavigationBarTitle({
title: '簡單服務器Mqtt連接'
})
}
})
- 域名安裝證書
1、首先在阿里云下載ssl證書,然后再服務器下載寶塔面板
2、在寶塔中下載nignx,然后添加站點

3、點擊設定–>ssl,輸入密鑰和證書,點擊保存,

4、添加nignx反向代理


location = /mqtt {
# 8083就是我們的emq的websocket的埠號
proxy_pass http://www.xxxxxxx.com:8083;
proxy_redirect off;
proxy_set_header Host www.xxxxxxx.com:8083;
proxy_set_header Sec-WebSocket-Protocol mqtt;
# 這個是與你的 js客戶端的庫有關系,本博文的不需要,為了兼顧以后小伙伴,我這里注釋了下!
#more_clear_headers Sec-WebSocket-Protocol;
# 這些都是 websocket必須要配置的
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
樹莓派:
這邊只需要實作訂閱跟發布就行了,
- 發布
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "59.110.42.24:1883"
#define CLIENTID "0bd981c5-a055-4196-8b7f-efb9f7a4d6ac"
#define TOPIC "test"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
int main(int argc, char* argv[])
{
//宣告mqtt客戶端
MQTTClient client;
//初始化客戶端選項 conn_opts
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//訊息初始化 pubmsg
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
char *username = "test";
char *password = "test";
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
exit(EXIT_FAILURE);
}
//保持心跳20
conn_opts.keepAliveInterval = 20;
//清理會話
conn_opts.cleansession = 1;
//客戶端的用戶名和密碼
conn_opts.username = username;
conn_opts.password = password;
//創建連接
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
//訊息負載(內容)
pubmsg.payload = PAYLOAD;
//訊息長度
pubmsg.payloadlen = (int)strlen(PAYLOAD);
//訊息質量分為0:不重要的訊息比如溫度,可以多次上傳丟失一次沒事,1:可能會丟失1次 2:永遠不會丟失
pubmsg.qos = QOS;
//有true和false 判斷訊息是否保留
pubmsg.retained = 0;
//發布訊息 token是訊息發布后,傳遞令牌將回傳客戶端檢查令牌是否已成功傳遞到其目的地
if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to publish message, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
//阻塞函式,等待訊息發布
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
printf("Failed to disconnect, return code %d\n", rc);
//斷開連接
MQTTClient_destroy(&client);
return rc;
}
- 訂閱
#coding=utf-8
import paho.mqtt.client as mqtt
import time
import RPi.GPIO as GPIO
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
HOST = "59.110.42.24"
PORT = 1883
GPIO_PIN = 17
GPIO.setmode(GPIO.BCM)
GPIO.setup(GPIO_PIN, GPIO.OUT)
def client_loop():
client_id = "0bd981c5-a055-4196-8b7f-efb9f7a4d6ac"
client = mqtt.Client(client_id) # ClientId不能重復,所以使用當前時間
client.username_pw_set("test", "test") # 必須設定,否則會回傳「Connected with result code 4」
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_forever()
def on_connect(client, userdata, flags, rc):
client.subscribe("test")
def trun(a):
if(a == "打開"):
GPIO.output(GPIO_PIN,GPIO.HIGH)
else:
GPIO.output(GPIO_PIN,GPIO.LOW)
def on_message(client, userdata, msg):
print(msg.topic+" "+msg.payload.decode("utf-8"))
trun(msg.payload.decode("utf-8"))
if __name__ == '__main__':
global a
a = -1
client_loop()
- 發布
#!/usr/bin/env python
# encoding: utf-8
import json
import sys
import os
import paho.mqtt.client as mqtt
import time
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")
TASK_TOPIC = 'test' # 客戶端發布訊息主題
client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
client = mqtt.Client(client_id, transport='tcp')
client.connect("59.110.42.24", 1883, 60) # 此處埠默認為1883,通信埠期keepalive默認60
client.loop_start()
def clicent_main(message: str):
time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
payload = {"msg": "%s" % message, "data": "%s" % time_now}
# publish(主題:Topic; 訊息內容)
client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))
client.publish(TASK_TOPIC,message)
print("Successful send message!")
return True
if __name__ == '__main__':
msg = "我是一條測驗資料!"
clicent_main(msg)
emq服務器
在我之前的博客中有emq服務器怎么安裝,這是最終效果,我小程式只要點擊打開就會發送打開,點擊關閉就會關閉

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/220962.html
標籤:其他
下一篇:C語言環境搭建
