python實作 MQTT訂閱、接收以及MySQL資料庫存盤
簡單物聯網應用——基于老人居家聲音監測系統

(1)資料接收程式使用 python 撰寫, 首先連接 MQTT 服務器, 訂閱硬體資料發送的主題
“esp/test” , 將資料轉換成 json 格式, 呼叫資料庫存盤函式 sqlsave(msgjson), MQTT 資料
接收程式如下:
# 連接 MQTT 服務器
def on_mqtt_connect():
mqttClient=mqtt.Client("pythontest")
mqttClient.connect(MQTTHOST, MQTTPORT,6)
mqttClient.loop_start()
def on_message_come(lient, userdata, msg):
get_data=msg.payload #bytes b'[s]
string=get_data.decode() #string
msgjson=json.loads(string)
print(msgjson)
sqlsave(msgjson)
#subscribe 訊息
def on_subscribe():
mqttClient.subscribe(subscribe, qos=0)
mqttClient.on_message = on_message_come # 訊息到來處理函式
(2)撰寫資料庫保存函式 sqlsave(jsonData), 連接資料庫, 并向 data_voice_sensor 表中插
入 MQTT 發送的資料:
def sqlsave(jsonData):
# 打開資料庫連接
db = pymysql.connect(host="192.168.174.128",
user="root",password="password",database="test",charset='utf8')
cursor = db.cursor() # 使用 cursor()方法獲取操作游標
# SQL 插入陳述句
sql = "INSERT INTO data_voice_sensor (get_time,sensorType,device,get_data,get_value)
VALUES ('%s','%s','%s','%s','%s');"\
%(jsonData['get_time'],jsonData['sensorType'],
jsonData['get_data'],jsonData['device'],jsonData['get_value'],)
cursor.execute(sql)
db.commit()
db.close()
上述代碼在我的電腦里面無緣無故不能訂閱MQTT主題了,目前沒找到原因,但是換一臺電腦是可以的
下面這個完整的有兩個,第二個是目前在我電腦上面可以跑起來的,請高手指教!
一、
import paho.mqtt.client as mqtt
import json
import pymysql
MQTTHOST = "192.168.43.188"
MQTTPORT = 1883
mqttClient = mqtt.Client()
subscribe ="esp/test"
#MySQL保存
def sqlsave(jsonData):
# 打開資料庫連接
db = pymysql.connect(host="192.168.174.128",user="root",password="password",database="test",charset='utf8')
# 使用cursor()方法獲取操作游標
cursor = db.cursor()
# SQL 插入陳述句
sql = "INSERT INTO data_voice_sensor (get_time,sensorType,device,get_data,get_value) \
VALUES ('%s','%s','%s','%s','%s');"\
%(jsonData['get_time'],jsonData['sensorType'],jsonData['get_data'],jsonData['device'],jsonData['get_value'],)
cursor.execute(sql)
db.commit()
print("資料庫保存成功!")
# 關閉資料庫連接
db.close()
# 連接MQTT服務器
def on_mqtt_connect():
mqttClient=mqtt.Client("pythontest")
mqttClient.connect(MQTTHOST, MQTTPORT,60)
mqttClient.loop_start()
def on_message_come(lient, userdata, msg):
get_data=msg.payload #bytes b'[s]
print(get_data)
string=get_data.decode() #string
print(string)
msgjson=json.loads(string)
print(msgjson)
sqlsave(msgjson)
# subscribe 訊息
def on_subscribe():
mqttClient.subscribe(subscribe, qos=0)
mqttClient.on_message = on_message_come # 訊息到來處理函式
def main():
on_mqtt_connect()
while True:
on_subscribe()
if __name__ == '__main__':
main()
二、
#!/usr/bin/python
# -*- coding: utf-8 -*
import paho.mqtt.client as mqtt
import json
import pymysql
import time
def gettime():
time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
return time1
# 服務器地址
host = '192.168.43.188'
# 通信埠 默認埠1883
port = 1883
username = ''
password = ''
# 訂閱主題名
topic = 'esp/test'
# 連接后事件
def on_connect(client, userdata, flags, respons_code):
if respons_code == 0:
# 連接成功
print('Connection Succeed!')
else:
# 連接失敗并顯示錯誤代碼
print('Connect Error status {0}'.format(respons_code))
# 訂閱資訊
client.subscribe(topic)
# 接收到資料后事件
def on_message(client, userdata, msg):
# 列印訂閱訊息主題
# print("topic", msg.topic)
# 列印訊息資料
jsondata=json.loads(msg.payload)
print("msg payload", jsondata)
sqlsave(jsondata)
def main_demo():
client = mqtt.Client()
# 注冊事件
client.on_connect = on_connect
client.on_message = on_message
# 設定賬號密碼(如果需要的話)
client.username_pw_set(username, password=password)
# 連接到服務器
client.connect(host, port=port, keepalive=60)
# 守護連接狀態
client.loop_forever()
#MySQL保存
def sqlsave(jsonData):
# 打開資料庫連接
db = pymysql.connect(host="192.168.174.128",user="root",password="password",database="test",charset='utf8')
# 使用cursor()方法獲取操作游標
cursor = db.cursor()
# SQL 插入陳述句
sql = "INSERT INTO data_voice (get_time,sensorType,device,get_data,get_value) \
VALUES ('%s','%s','%s','%s','%s');"\
%(gettime(),jsonData['sensorType'],jsonData['get_data'],jsonData['device'],jsonData['get_value'],)
cursor.execute(sql)
db.commit()
print("資料庫保存成功!")
# 關閉資料庫連接
db.close()
if __name__ == '__main__':
main_demo()

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/290163.html
標籤:其他
上一篇:初識5G 揭開5G的神秘面紗
