在將aws iot 設備 sdk 用于 python v2 (v1.7.1)的 python 應用程式中,我遇到了無法更新設備影子的問題。
啟動程式后,DeviceShadowManager將嘗試獲取最新的影子狀態并在本地進行設定。如果存在delta狀態,DeviceShadowManager則將合并最后一個reported狀態和delta狀態并發布它。那個有效。但是,當管理器訂閱更新時,在初始設定后,我遇到了一個錯誤,當desired狀態改變時,管理器無法更新reported狀態。這是錯誤:
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
我查看了源代碼,但只是不明白為什么 aTypeError被提出,特別是因為這個確切的場景似乎是由tryandexcept塊處理的,還是我弄錯了?
錯誤的來源:
if callback:
def callback_wrapper(topic, payload, dup, qos, retain):
try:
callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
except TypeError:
# This callback used to have fewer args.
# Try again, passing only those those args, to cover case where
# user function failed to take forward-compatibility **kwargs.
callback(topic=topic, payload=payload) # this is line 506
您可以在下面找到我的代碼和程式日志。
這個資料類代表陰影:
from dataclasses import dataclass
@dataclass
class DeviceShadow:
score_threshold: float = 0.6
minimum_distance: int = 150
陰影由DeviceShadowManager. 其中大部分是基于上述存盤庫中的影子樣本。
from dataclasses import asdict
from queue import Queue
from threading import Lock
from awscrt import mqtt
from awsiot import iotshadow
from awsiot.iotshadow import IotShadowClient
from app.device_shadow.device_shadow import DeviceShadow, from_json as device_shadow_from_json
from app.models import log
SHADOW_VALUE_DEFAULT = DeviceShadow()
class DeviceShadowManager:
_shadow_client: IotShadowClient
shadow_value: DeviceShadow = DeviceShadow()
_lock = Lock()
_thing_name: str
def __init__(self, thing_name: str, mqtt_connection: mqtt.Connection):
self._thing_name = thing_name
self._shadow_client = iotshadow.IotShadowClient(mqtt_connection)
update_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_accepted(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_update_shadow_accepted # omitted
)
update_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_rejected(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_update_shadow_rejected # omitted
)
# Wait for subscriptions to succeed
update_accepted_subscribed_future.result(60)
update_rejected_subscribed_future.result(60)
log.info("Subscribing to Get responses...")
get_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_accepted(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_get_shadow_accepted)
get_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_rejected(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_get_shadow_rejected) # omitted
# Wait for subscriptions to succeed
get_accepted_subscribed_future.result()
get_rejected_subscribed_future.result()
log.info("Subscribing to Delta events...")
delta_subscribed_future, _ = self._shadow_client.subscribe_to_shadow_delta_updated_events(
request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(
thing_name=self._thing_name
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_shadow_delta_updated)
# Wait for subscription to succeed
delta_subscribed_future.result()
# From here on out the rest runs asynchronously.
# Issue request for shadow's current value.
# The response will be received by the on_get_accepted() callback
with self._lock:
publish_get_future = self._shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(
thing_name=self._thing_name,
),
qos=mqtt.QoS.AT_LEAST_ONCE
)
# Ensure that publish succeeds
publish_get_future.result()
def on_get_shadow_accepted(self, response: iotshadow.GetShadowResponse) -> None:
log.info("Finished getting initial shadow value.")
if response.state and response.state.delta:
if not response.state.reported:
response.state.reported = {}
merged_state = self.merge_states(response.state.delta, response.state.desired)
return self.set_desired(device_shadow_from_json(merged_state))
if response.state and response.state.reported:
return self.set_local(device_shadow_from_json(response.state.reported))
self.set_desired(SHADOW_VALUE_DEFAULT)
return
def on_shadow_delta_updated(self, delta: iotshadow.ShadowDeltaUpdatedEvent) -> None:
if delta.state:
if delta.state is None:
log.info("Delta reports that nothing is set. Setting defaults...")
self.set_desired(SHADOW_VALUE_DEFAULT)
return
log.info("Delta reports that desired shadow is '{}'. Changing local shadow...".format(delta.state))
self.set_desired(self.merge_states(delta.state, self.shadow_value))
else:
log.info("Delta did not report a change")
@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
for key, value in delta.items():
reported[key] = value
return reported
def set_local(self, value: DeviceShadow) -> None:
with self._lock:
self.shadow_value = value
def set_desired(self, new_value: DeviceShadow) -> None:
with self._lock:
if self.shadow_value == new_value:
log.debug("Local shadow is already '{}'.".format(new_value))
return
log.debug("Changing local shadow to '{}'.".format(new_value))
self.shadow_value = new_value
log.debug("Updating reported shadow to '{}'...".format(new_value))
request = iotshadow.UpdateShadowRequest(
thing_name=self._thing_name,
state=iotshadow.ShadowState(
desired=asdict(new_value),
reported=asdict(new_value),
),
)
self._shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
您將在下面找到日志:
DEBUG:app.mqtt:Connecting to xxxxxxxxxxxxxx-ats.iot.eu-central-1.amazonaws.com with client ID '80d8bc54-971e-0e65-a537-37d14a3cb630'...
INFO:app.models:Subscribing to Get responses...
INFO:app.models:Subscribing to Delta events...
INFO:app.models:Finished getting initial shadow value.
DEBUG:app.models:Changed local shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'.
DEBUG:app.models:Updating reported shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'...
INFO:app.models:Update request published.
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
INFO:app.models:Delta reports that desired shadow is '{'minimum_distance': 15035}'. Changing local shadow...
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
正如您所看到的堆疊跟蹤非常短,有沒有辦法更好地除錯它?關于為什么給我這個特定錯誤的任何想法,也許如何解決它?感謝所有幫助!
uj5u.com熱心網友回復:
我很確定問題出在
@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
for key, value in delta.items():
reported[key] = value
return reported
其中__setitem__對reported引數的呼叫引發了 a,TypeError因為報告的引數是一個DeviceShadow不支持專案分配的資料類物件。
如果要設定具有欄位名稱字串的資料類的欄位,可以使用setattr(reported, key, value).
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/344620.html
