我是 python 的初學者,我有一個使用時間觸發器運行的 Azure 函式。此函式從 Azure 服務總線中讀取一批字串格式的原始 JSON 資料。
這是兩行資料。在現實中,我收到了大約 50 條這樣的訊息是連續的。現在我想逐行拆分此訊息,然后將其存檔到 Azure 存盤。
該訊息類似于以下示例(行 1 和行 2 的連接):
{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.0254566,\"10\":-0.054562772}},\"NUMBER_TAG\":\"2145600\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":1,"id":"074222a38-2816-42c7-b95c-6644448ba9d","t":-2}
第 1 行是:
{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}
第 2 行是:
{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTTAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-2}
一行的結構如下圖所示:

在我看來,首先我應該拆分每一行,然后創建一個資料框并在相關列中插入每個值。之后,我附加到一個 blob。這樣對嗎?
我能怎么做?您建議的解決方案是什么?
編輯: 我從服務總線讀取的代碼:
from azure.servicebus import ServiceBusClient, ServiceBusMessage
connection_str = "**"
topic_name = "***"
subscription_name = "***"
servicebus_client = ServiceBusClient.from_connection_string(
conn_str=connection_str, logging_enable=True)
with servicebus_client:
# get the Subscription Receiver object for the subscription
receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name, subscription_name=subscription_name, )
with receiver:
for msg in receiver:
print("Received: " str(msg))
# complete the message so that the message is removed from the subscription
receiver.complete_message(msg)
uj5u.com熱心網友回復:
考慮具有三行的示例資料:
data = '{"Name": "Hassan", "code":"12"}{"Name": "Jack", "code":"345"}{"Name": "Jack", "code":"345"}'
以下是從這些資料中獲取資料框的方法:
from ast import literal_eval
data = [literal_eval(d '}')for d in data.split('}')[0:-1]]
df = pd.DataFrame.from_records(data)
Output:
Name code
0 Hassan 12
1 Jack 345
2 Jack 345
uj5u.com熱心網友回復:
由于訊息是單獨發送的,因此您可以單獨處理它們。不需要連接成一個字串。只需將它們附加到資料框中即可。下面的示例用于佇列,但您可以擴展到主題/訂閱。我還附上了結果以向您展示輸出的樣子。
from azure.servicebus import ServiceBusClient
import pandas as pd
import json
from pandas import json_normalize
CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
QUEUE_NAME = 'xxxxxxxxxxx'
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
# create an Empty DataFrame object
df = pd.DataFrame()
msg_concat = ""
dfs = []
with receiver:
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
for msg in received_msgs:
msg_dict = json.loads(str(msg))
df2 = json_normalize(msg_dict)
df = df.append(df2, ignore_index = True)
receiver.complete_message(msg)
print(df)
print("Receive is done.")
Name Seri SiName As ... Id Asse id t
0 21000000 ... 0 None 075f0a38-2816-42c7-b95c-66c425b8ba9d -1
1 4560000 ... 0 None 075f0a38-2816-42c7-b95c-66c425b8ba9d -2
[2 rows x 21 columns]
Receive is done.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/334197.html
標籤:Python 熊猫 天蓝色 天蓝色函数 azure-servicebus-topics
