之前寫過一篇如使用阿里云上部署.NET 3.1自定義運行時的文章,吐槽一下,雖然現在已經2022年了,但是阿里云函式計算的支持依然停留在.NET Core 2.1,更新緩慢,由于程式解包大小的限制,也不能放太復雜的東西的上去,雖然現在.NET 6裁剪包能挺好地解決這個問題,但是心里還是不爽,
需求
言歸正傳,有這么一個情景:發送資料想接入阿里云的IoT平臺,然后直接插入postgresQL資料庫中,正常來說,只要資料發送到了IoT平臺,然后定義轉發到RDS就可以了,不過阿里云有幾個限制:
- 資料流轉到RDS資料庫,只能支持
mysql與sql server - 資料流轉只支持
json形式的資料流轉,如果發送的是透傳的資料,那么發送不了(更新:現在新版的資料流轉已經支持了,)
思前想后,可能只能掏出阿里云的函式計算服務了,運用函式計算作為中轉,將透傳的資料流轉給函式計算,然后在函式計算中執行sql陳述句,
IoT平臺接收設定
阿里云的物聯網平臺,設定了基本的產品和設備之后,如果是物模型的話,那么自行設定好對應的物模型,對于透傳就比較簡單了,支持MQTT的設備方只需要定義:
- 透傳的訊息發送到/{productKey}/{deviceName}/user/update
- 訂閱阿里云的/{productKey}/{deviceName}/user/get
- 設定阿里云的Mqtt IoT實體終端節點:({YourProductKey}.iot-as-mqtt.{YourRegionId}.aliyuncs.com:1883
- 設定設備的ProductKey和ProductSecret
設定好之后,即可傳輸資料到阿里云IoT端,資料傳輸過來,看下日志,如果能看到:

那說明就已經發送OK了,接收到的是普通的字串(不是json),需要進行進一步決議,
IoT流轉設定
在云產品流轉中,新建決議器,設定好資料源,資料目的選擇函式計算:

決議器腳本比較簡單:
var data = https://www.cnblogs.com/podolski/p/payload();
writeFc(1000, data);
注意,payload函式payload(textEncoding)是內建的函式:
- 不傳入引數:默認按照UTF-8編碼轉換為字串,即payload()等價于payload('utf-8'),
- 'json':將payload資料轉換成Map格式變數,如果payload不是JSON格式,則回傳例外,
- 'binary':將payload資料轉換成二進制變數進行透傳,
這里我使用文本透傳的形式,將資料轉成UTF8文本傳輸,writeFc指將轉換的內容傳遞給1000編號的目標函式,詳情見檔案,
當然還可以使用更為復雜的腳本,實作對腳本資料的初步處理,由于我這里后面還有函式計算,我就直接將資料轉到下一個節點,
函式計算配置
按照官方檔案新建函式,請注意不需要新建觸發器!我們這里的函式使用python語言,通過psycopg2實作資料插入到postgres資料庫中,
由于函式計算中,默認并沒有該包,需要手動添加參考,官方建議使用Serverless Devs工具安裝部署,這個玩意非常不好用,嗯,我不接受他的建議,推薦大家使用vscode,安裝阿里云serverless的插件,這樣其實更加方便,
按照插件的檔案,自己建立好服務與函式,默認會給一個函式入口:
# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
# logger = logging.getLogger()
# logger.info('initializing')
def handler(event, context):
logger = logging.getLogger()
logger.info(event)
return 'hello world'
我們首先在函式上右鍵,然后選擇Install Package,選擇pip安裝psycopg2,依賴就自動被安裝上了,這個非常方便,
請注意,通過IOT流轉過來的字串,是b'data'這樣的形式的形式,需要先decode一下,然后在處理,修改函式為:
# -*- coding: utf-8 -*-
import logging
import psycopg2
import uuid
import time
def insert_database(device_id,data):
timest = int(time.time()*1000)
guid = str(uuid.uuid1())
conn = psycopg2.connect(database="", user="", password="", host="", port="")
cur = conn.cursor()
sql = 'INSERT INTO "data"("Id","DeviceId","Timestamp", "DataArray") VALUES (\'{id}\', \'{deviceid}\', \'{timestamp}\', array{data})'
sql = sql.format(id= guid, deviceid= device_id, timestamp= timest, data= https://www.cnblogs.com/podolski/p/data)
cur.execute(sql)
conn.commit()
print(" Records inserted successfully")
conn.close()
def extract_string_array(data: bytes):
arr = data.decode().strip().split(' ')
# 寫自己的邏輯
return deviceid, resu
def handler(event, context):
logger = logging.getLogger()
logger.info(event)
device_id, result = extract_string_array(event)
insert_database(device_id, result)
return 'OK'
保存,然后在vscode中deploy即可,
提示:vscode中也可以進行本地的debug,還是比較方便的,不過這些功能依賴docker,所以還是提前裝好比較好,
弄完了之后,應該是能看見這樣的畫面:

至此,資料就正常流轉成功,
要點
- 不要設定觸發器,當時為了配置這個觸發器弄了非常長的時間
- 函式計算與資料庫的VPC應該相同,并且賦予權限,否則無法訪問,
- 函式計算默認無法保持狀態,如果有這個需求,最好試試別的方案,或者看下函式計算的預留實體(常駐實體)
- 提前在本地安裝好Docker,要不會有各種各樣的問題出現,
- Postgresql插入陣列格式的資料,需要注意格式,可以參考這篇檔案
- 如果長時間不用docker,導致docker無法啟動,可以參考這篇文章
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/461801.html
標籤:.NET Core
