Eclipse Paho MQTT Python Client 使用手冊
原文地址:https://www.cooooder.com/archives/20210303
目錄
- 介紹
- 環境
- 準備
- 快速開始
- 常用API
- Client
- 回呼函式
- 示例
介紹
paho.mqtt.python 是一個MQTT客戶端python庫,能夠讓應用程式簡單方便的連接到MQTT代理進行訊息發布、訂閱主題和訊息接收,
目前 paho.mqtt.python-1.5.1 版本支持5.0、3.1.1和3.1 MQTT協議,同時支持Python 2.7.9+或3.5+,
環境
- MQTT代理:EMQ X Broker 4.2.6
- Python 3.9.0
- paho-mqtt 1.5.1
準備
-
參照 EMQ X Broker安裝啟動教程
成功啟動EMQ后,可通過瀏覽器訪問 http://localhost:18083 admin/public 進入EMQ控制臺,在【工具 > Websocket】模塊可方便進行客戶端連接、訂閱、訊息接收、發布等測驗和除錯作業 -
Python 安裝省略
-
paho-mqtt 安裝
pip install paho-mqtt
快速開始
Python快速實作MQTT主題訂閱和訊息接收
import paho.mqtt.client as mq_tt
def on_connect(client, userdata, flags, rc):
"""
回呼函式:當MQTT代理回應客戶端連接請求時觸發
:param client: 回呼回傳的客戶端實體
:param userdata: Client()或user_data_set()中設定的私有用戶資料
:param flags: MQTT代理發送的回應標識
:param rc: 連接結果
0:連接成功
1:連接被拒絕 - 協議版本
2: 連接被拒絕 - 客戶端識別符號無效
3:連接被拒絕 - 服務器不可用
4:連接被拒絕 - 用戶名或密碼錯誤
5:連接被拒絕 - 未授權6-255:當前未使用
:return:
"""
print("Connected with result code "+str(rc))
# 在on_connect()中進行訊息訂閱,是因為如果丟失連接進行重連,主題也會重新被訂閱
client.subscribe("testTopic/#")
def on_message(client, userdata, message):
"""
回呼函式:當接收到MQTT代理發布的訊息時觸發
:param client: 回呼回傳的客戶端實體
:param userdata: Client()或user_data_set()中設定的私有用戶資料
:param message: MQTTMessage的一個實體,這是一個包含主題,有效負載,qos,retain的類
:return:
"""
print(message.topic+" "+str(message.payload))
mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.on_connect = on_connect
mq_client.on_message = on_message
# 連接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)
# 阻塞式自動處理收發資料、自動處理重新連接,所有的資料處理邏輯都在預先設定好的回呼函式中進行的
mq_client.loop_forever()
在 EMQ X Broker - 【客戶端】可以看到客戶端已連接
查看原圖
在 EMQ X Broker - 【Websocket】發布testTopic主題訊息
查看原圖
Python程式列印出接收到的訊息
Connected with result code 0
testTopic b'{ "msg": "Hello, World!" }'
testTopic b'{ "msg": "Hello, World2!" }'
常用API
Client
Client類實體常規用法流程如下:
- 創建一個客戶端實體
- 使用任一 connect*() 方法連接到MQTT代理
- 呼叫任一 loop*() 方法保持與MQTT代理通訊
- 使用 subscribe() 方法訂閱一個主題并接收訊息
- 使用 publish() 方法向MQTT代理發布訊息
- 使用 disconnect() 中斷與MQTT代理的連接
client()
# 構造方法
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
- client_id
- 連接到MQTT代理時使用的唯一客戶端ID字串,如果為0或者為None,將隨機生成分配一個,這種情況下clean_session引數必須為True
- clean_session
- 布林值型別,用來確定客戶端型別,如果為True,當斷開連接時,MQTT代理將移除該客戶端的所有資訊;如果為False,客戶端則為持久客戶端,當斷開連接時,訂閱資訊和訊息佇列將被MQTT保存
- 當斷開連接時,客戶端不會丟棄自己發送的訊息,呼叫 connect() 或者 reconnect() 將導致重新發送訊息,只有使用 reinitialise() 可以將客戶端重置為初始狀態
- userdata
- 用戶定義的任意型別資料作為 userdata 引數傳遞給回呼函式,可以通過呼叫user_data_set() 方法進行更新,不過會有點延遲
- protocol
- 客戶端使用的MQTT協議版本,可以是 MQTTv31 或 MQTTv311
- transport
- 傳輸形式,設定為websockets,則會通過websockets發送給MQTT,默認tcp
- 示例
import paho.mqtt.client as mqtt
mqttc = mqtt.Client()
connect()
connect(host, port=1883, keepalive=60, bind_address="")
客戶端連接MQTT代理,這是一個阻塞函式
- host
- 代理的主機名或者IP地址
- port
- 連接服務的埠,默認1883
- keepalive
- 心跳檢測時長
- bind_address
- 系結此客戶端本地網路的IP地址
- 回呼函式
- on_connect()
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")
與 loop_start() 結合使用以非阻塞的形式進行連接,在呼叫 loop_start() 之前,連接不會完成
disconnect()
disconnect()
徹底與MQTT代理斷開,使用該方法斷開連接不會讓代理發送遺囑訊息
- 回呼函式
- on_disconnect()
enable_logger()
enable_logger(logger=None)
使用標準的Python日志包啟用日志記錄,可以與on_log回呼方法同時使用
reconnect()
reconnect()
使用之前的資訊配置重新連接代理,在呼叫之前必須先呼叫 connect*() 方法
reinitialise()
重置客戶端為初始化狀態,引數與 client() 一致
- 示例
mqttc.reinitialise()
loop()
loop(timeout=1.0, max_packets=1)
定期呼叫處理事件
- timeout
- 最大阻塞的秒數
- max_packets
- 已過期,不設定
- 示例
while True:
mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)
這些函式實作了網路回圈的執行緒介面,在執行connect*()之前或者之后呼叫一次 loop_start() ,后臺會自動運行一個執行緒呼叫 loop() ,這樣就釋放了主執行緒去執行其它作業,避免發生阻塞,這個呼叫也處理重新連接到代理,呼叫 loop_stop() 停止后臺執行緒
mqttc.connect("127.0.0.1")
mqttc.loop_start()
while True:
mqttc.publish("topicTest", 'test')
loop_forever()
阻塞式網路回圈處理事件,直到客戶端呼叫 disconnect() 才會回傳,它會自動重連
publish()
publish(topic, payload=None, qos=0, retain=False)
客戶端向MQTT代理發送一條訊息
- topic
- 訊息發布的主題,不能為None或者空字符
- payload
- 發送的訊息內容,如果沒有賦值或者賦值為None,則將使用零長度的訊息,傳遞int或者float將會被轉換為該數字的字串, 如果想發送真正的int或者float資料,使用 struct.pack() 去創建
- qos
- 訊息的服務質量等級,必須為0 or 1 or 2
- retain
- 設定為True,MQTT代理保留最后一條訊息,以便分發給訊息發布后的訂閱者
- 回呼函式
- on_publish()
Return MQTTMessageInfo物件
reconnect_delay_set()
reconnect_delay_set(min_delay=1, max_delay=120)
斷開連接后,客戶端將自動嘗試連接,每次嘗試間隔 [min_delay, max_delay] 秒,從min_delay開始逐漸加倍至max_delay,連接成功后,延遲重置為min_delay
subscribe()
subscribe(topic, qos=0)
訂閱一個或多個主題,該方法有三種不同的呼叫方式:
# 1. 字串和整數
subscribe("my/topic", 2)
# 2. 字串和整數元組
subscribe(("my/topic", 1))
# 3. 字串和整數元組的串列
# 單次呼叫多個主題,比多次呼叫subscribe更有效
subscribe([("my/topic", 0), ("another/topic", 2)])
Return 一個元組 (result, mid)
- result
- 成功:MQTT_ERR_SUCCESS
- 失敗:(MQTT_ERR_NO_CONN, None)
- mid
- 訊息ID
- 回呼函式
- on_subscribe()
unsubscribe()
unsubscribe(topic)
取消一個或多個主題
- topic
- 主題字串或者字串串列
Return 一個元組 (result, mid)
- 主題字串或者字串串列
- result
- 成功:MQTT_ERR_SUCCESS
- 失敗:(MQTT_ERR_NO_CONN, None)
- mid
- 訊息ID
- 回呼函式
- on_unsubscribe()
user_data_set()
user_data_set(userdata)
設定傳遞給回呼函式的用戶私有資料
username_pw_set()
username_pw_set(username,password = None)
設定用戶名和密碼(可選)供MQTT代理驗證,必須在 connect*() 之前呼叫
will_set()
will_set(topic, payload=None, qos=0, retain=False)
設定遺囑發送給MQTT代理,如果客戶端在沒有呼叫 disconnect() 的情況下斷開連接,則MQTT代理將會代表它發送該訊息
- topic
- 遺囑訊息發布的主題,不能為None或者空字符
- payload
- 遺囑發送的訊息內容,如果沒有賦值或者賦值為None,則將使用零長度的訊息作為遺囑,傳遞int或者float將會被轉換為該數字的字串, 如果想發送真正的int或者float資料,使用 struct.pack() 去創建
- qos
- 遺囑訊息的服務質量等級,必須為0 or 1 or 2
- retain
- 設定為True,MQTT代理保留最后一條訊息,以便分發給訊息發布后的訂閱者
回呼函式
on_connect()
on_connect(client, userdata, flags, rc)
MQTT代理回應客戶端連接請求時( connect*() )呼叫
- client
- 回呼回傳的客戶端實體
- userdata
- Client() 或 user_data_set() 中設定的私有用戶資料
- flags
- MQTT代理發送的回應標識
- rc
- 連接結果
- 0:連接成功
- 1:連接被拒絕 - 協議版本
- 2:連接被拒絕 - 客戶端識別符號無效
- 3:連接被拒絕 - 服務器不可用
- 4:連接被拒絕 - 用戶名或密碼錯誤
- 5:連接被拒絕 - 未授權6-255:當前未使用
- 連接結果
- 示例
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
mqttc.on_connect = on_connect
on_disconnect()
當客戶端與MQTT代理斷開連接時 (disconnect()) 呼叫
on_disconnect(client, userdata, rc)
- client
- 回呼回傳的客戶端實體
- userdata
- Client() 或 user_data_set() 中設定的私有用戶資料
- rc
- 斷開結果,如果是 MQTT_ERR_SUCCESS(0),則是回應disconnect()呼叫
- 如果是其它值,則是意外關閉
- 示例
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
mqttc.on_disconnect = on_disconnect
on_message()
on_message(client, userdata, message)
在客戶端收到已訂閱主題的訊息,并且該訊息沒有被主題過濾器 message_callback_add() 匹配時呼叫
- client
- 回呼回傳的客戶端實體
- userdata
- Client() 或 user_data_set() 中設定的私有用戶資料
- message
- MQTTMessage實體,包含 topic、payload、qos、retain
- 示例
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
mqttc.on_message = on_message
message_callback_add()
message_callback_add(sub, callback)
定義特定訂閱主題傳入的訊息回呼,包括通配符,比如:客戶端訂閱了 sensor/#主題,一個回呼處理 sensor/temperature,另一個回呼處理 sensor/humidity
- sub
- 待過濾的主題,只能定義一個回呼
- callback
- 回呼函式,與 on_message() 相同形式
- 示例
# 處理溫度訊息回呼
def temperature_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
# 處理濕度訊息回呼
def humidity_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
mqttc.subscribe('sensor/#')
mqttc.message_callback_add('sensor/temperature', temperature_callback)
mqttc.message_callback_add('sensor/humidity', humidity_callback)
message_callback_remove()
message_callback_remove(sub)
洗掉先前注冊的主題/訂閱特定回呼
on_publish()
on_publish(client, userdata, mid)
當客戶端呼叫 publish() 發布一條訊息至MQTT代理后呼叫,Qos=1或2時,意味著客戶端和代理完成握手,Qos=0時,僅表示訊息離開客戶端,
- mid
- mid變數與從相應的 publish() 回傳的mid變數匹配,以允許跟蹤傳出的訊息,
即使 publish() 呼叫回傳,也不總意味著訊息已發送
on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)
當MQTT代理回應訂閱請求時被呼叫
- mid
- mid變數匹配從相應的 subscribe() 回傳的mid變數
- granted_qos
- 整數串列,它提供了代理為每個不同的訂閱請求授予的QoS級別
on_unsubscribe()
on_unsubscribe(client, userdata, mid)
當代理回應取消訂閱請求時呼叫
- mid
- mid匹配從相應的 unsubscribe() 回傳的mid變數
on_log()
on_log(client, userdata, level, buf)
當客戶端有日志資訊時呼叫
- level
- 訊息嚴重性
- MQTT_LOG_INFO
- MQTT_LOG_NOTICE
- MQTT_LOG_WARNING
- MQTT_LOG_ERR
- MQTT_LOG_DEBUG
- 訊息嚴重性
- buf
- 該訊息本身就在buf里
可以與標準的Python logging同時使用,通過enable_logger()方法啟用
- 該訊息本身就在buf里
示例
import paho.mqtt.client as mq_tt
import logging
logging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("topicTest/#")
def topic_one_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
def topic_two_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.enable_logger()
mq_client.on_connect = on_connect
mq_client.message_callback_add("topicTest/one", topic_one_callback)
mq_client.message_callback_add("topicTest/two", topic_two_callback)
# 連接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)
mq_client.loop_start()
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/265995.html
標籤:其他
