大家好 這是我的第一篇文章 因為有很多小伙伴想參考于是我發到了這里
首先了解一下整體專案架構, 我在一家外包公司上班, 這個專案是一個外包專案, 公司只給了我3個星期的時間,所以我當初制作這個專案的時候我也很緊張, 從頭到尾只有我自己一個人,
本人技術背景:
我只是一個負責WebApi開發的普通程式員,平時也就寫寫簡單的業務介面,這個專案的實作難度對我而言還是很難的,不過還是成功做了出來 相信有不少人會用得上其中的一些技術希望大家開發順利從中獲得幫助,
專案的主要功能為:
通過物聯網設備對農業土壤資訊和種植資訊進行采集并且保存到云端提供呼叫
他的功能架構是:
1. 構建一套資料平臺
2. 對接智能網關設備
3. 獲取傳感器資料
4. 對資料進行清洗和展示
5. 開發對公介面提供第三方呼叫
專案整體架構

啟動
-
當天開完會回來 得知需求后 我腦子還一片空白,沒過幾天以后 我在客戶提供的基地去調研,看看怎么布電和設施,要采集的資訊是那些,


看了兩個地點以后 發現很多問題 比方說強電無法進入菜地等各種問題 最終選擇了右邊的種植基地,
-
現場勘察過以后 開始采購設備,但是我覺得傳感器這種東西應該是很成熟的 于是我在淘寶上搜索專案所需要的傳感器設備, 后來真的讓我找到了合適的傳感器 就是有點貴 要500塊錢一個,
跟著傳感器的說明書 購買了rs485串口轉USB的線 看了非常多RS485的檔案 自己嘗試寫了串口指令 還是拿不到資料,后來問賣家 賣家寫的指令卻可以用 當時我還不知道為什么,不過能夠成功拿到資料就好,
但是拿到了傳感器的資料,還得想辦法怎么接收傳感器的資料和下發傳感器指令?


-
在傳感器的購買頁面下 我看到了一種推薦的商品 叫做DTU 智能網關, 便立刻買了一個回來研究 可惜研究了很久 都不知道如何使用python和該設備進行通訊,不斷地查閱說明書和設備的官方檔案后 發現該設備的作業原理是
通過tcp連接設備 → 轉發tcp指令給串口 → 接收串口回傳的資料 → 回傳服務器
于是我跟著設備的教程 使用了TCP測驗工具等 手動撰寫了串口指令 成功通過網關設備獲取到了傳感器的資料資訊, 那是不是用python連接設備 發送串口資料就可以拿到資料了呢?可我又從來沒寫過socket編程 就非常蛋疼了,

-
后來我在js上找到了一篇不錯的文章 并開始嘗試上手撰寫介面
https://www.jianshu.com/p/c0b13dd11c6e# -*- coding:utf-8 -*- import socket import threading from Config import TCP_SERVER_PORT from Main import client_handler from concurrent.futures import ThreadPoolExecutor def Activate(): """輸出服務器狀態到控制臺""" while True: time.sleep(5) # print("[活動中的執行緒]",threading.activeCount()) print("[DB]", App.db) if __name__ == '__main__': # 創建服務器, 埠不支持復用 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 系結埠, 列印運行資訊 server_address = ('', TCP_SERVER_PORT) server_sock.bind(server_address) print('TCP Server starting up on port {}'.format(server_address[1])) # 設定socket為被動連接, 默認為主動連接 server_sock.listen(5) # 執行緒池, 任務池 pool = ThreadPoolExecutor(max_workers=8) task_pool = [] # # 監聽服務器活動 # Activate = threading.Thread(target=Activate) # Activate.start() try: while True: # 主行程只用來負責監聽新的客戶連接 print('Main Thread [{}], 正在等待客戶端連接...'.format( threading.current_thread().name)) # client_sock是專為這個客戶端服務的socket,client_addr是包含客戶端IP和埠的元組 client_sock, client_addr = server_sock.accept() print('Main Thread [{}], client {} 已連接'.format( threading.current_thread().name, client_addr)) # 1.把新接收到的連接交由下一步處理 # client_handler(client_sock, client_addr) # 2.多執行緒 # client = threading.Thread(target=client_handler, args=(client_sock, client_addr)) # client.setDaemon(True) # client.start() # 3.可控執行緒池 pool.submit(client_handler, client_sock, client_addr) print("提交執行緒池") except Exception as e: # 例外結束 print(e) server_sock.close() finally: # 關閉監聽socket,不再回應其它客戶端連接 server_sock.close() print("關閉服務器")上面的代碼中定義了一個socket服務器, 監聽到的socket連接 通過 pool.submit(client_handler, client_sock, client_addr) 把連接提交到執行緒池 通過下面的方法進行校驗網關設備是否合法連接
在網關設備內 可以設定網關握手時發送什么字串作 作為注冊包 所以我設計了一個表 用來儲存網關設備的密鑰
相當于 密鑰是網關設備的身份證ID
下面的方法會把注冊包的內容 通過請求服務器進行判斷 密鑰是否合法 如果合法
就會開啟一個新的執行緒 threading.Thread(target=client_thread, args=(client_sock, client_addr, gatewaydata)) 專門負責保持該網關設備與服務器的通信def client_handler(client_sock, client_addr): """對連接進行校驗, 處理注冊包, 合法連接新建執行緒""" print("[收到新的客戶端連接]處理連接", "--"*40) data = client_sock.recv(1024) print("注冊包內容: ", data) # 轉譯bytes字串 data = str(data, encoding='utf-8') type, gatewaydata = AuthGateway(data) # 請求主服務器檢驗注冊包 if type: # 檢驗成功 # 為每個新的客戶連接創建一個執行緒,用來處理客戶資料 client = threading.Thread(target=client_thread, args=(client_sock, client_addr, gatewaydata)) client.start() print("[注冊包核驗成功]合法連接, 新增子執行緒進行跟蹤") else: # 檢驗失敗 client_sock.close() print("[注冊包資訊例外]非法連接, 服務器主動斷開連接")圖中新增的執行緒函式 client_thread 主要負責的作業是 獲取該網關需要詢問的指令 并且發送指令給串口
因為上面說到的 網關的作業方式是 負責發送資料給串口和接收串口回傳的資料
以下代碼的作業流程為
1. 生成任務id 該任務主要負責每隔多久執行一次 詢問服務器獲取指令并且獲取資料發送給服務器
2. 創建任務調度器def client_thread(client_sock, client_addr, gatewaydata): '''處理下發任務''' try: # 生成任務ID TaskName = RandomStr() print("[生成定時任務ID]",TaskName) except Exception as e: print("[發生錯誤:地址1]",e) client_sock.close() try: # 任務引數 Args = [{ "client_sock":client_sock, "gatewayID":gatewaydata, "taskID":TaskName }] # 添加任務調度 scheduler.add_job( TaskCheduler, "interval", id=TaskName, args=Args, seconds=60, jobstore="default", executor="default", ) except Exception as e: print("[發生錯誤:地址2]",e) client_sock.close()任務調度器的作業:
1. GatAllGatewayInstructions(ChedulerData['gatewayID']) 通過當前連接的網關設備的網關ID 詢問服務器獲取該網關設備下的指令def TaskCheduler(ChedulerData): """任務調度器""" print("--"*40) print("執行任務") # 請求服務器獲取指令集 type, data = GatAllGatewayInstructions(ChedulerData['gatewayID']) list = [] for i in data['data']: # 限制每條指令執行時間間隔為1秒 time.sleep(1) # print() # print("指令:", i) s = i["data"] msg = s['busadd'] + " " + s['featurescode'] + " " + s['registeraddr_start'] + " " + s['registeraddr_end'] + " " + s['read_start'] + " " + s['read_end'] # print("串口指令:",msg) msg = GeneratorMsg(msg, s['crccheck']) # print("計算結果:",msg) clienttype, data = ModbusRTUIO(ChedulerData['client_sock'],i['equipmentId'], msg) print(clienttype, data) if clienttype == False: scheduler.remove_job(ChedulerData['taskID']) print("設備連接例外 自動斷開連接 銷毀任務") return False list.append(data) # 推送資料 PushData(list)def GatAllGatewayInstructions(gatewayID): """獲取該網關全部指令""" print("[SDK.py][GatAllGatewayInstructions]=>獲取該網關全部指令","GatewayID: ",gatewayID) request_url = ResultServer + '/iot/gateway/get/instructions' headers = { "Content-Type": "application/json" } data = { 'gatewayID':gatewayID } result = requests.post(request_url, json=data, headers=headers) if result.status_code == 200: jsondata = result.json() if jsondata["code"] == 200: # print(jsondata) return True, jsondata else: return False, None return False, None圖中為我做的后臺, 里面設定的是我要詢問的網關設備下的傳感器的資料時所需的串口指令 基于RS485協議

class Equipment(BaseModel, db.Model): """設備引數""" __tablename__ = 'equipment' gateway_id = db.Column(db.Integer) # 網關ID name = db.Column(db.String(255)) # 傳感器名 body = db.Column(db.Integer) # body ID paramtemplateid = db.Column(db.Integer) # 引數模板 # 總線資訊 modbus_busadd = db.Column(db.String(255)) # 從機地址(總線地址) modbus_featurescode = db.Column(db.String(255), default="4") # 功能碼(功能號) # 暫存器起始地址 modbus_registeraddr_start = db.Column(db.String(255)) # 起始位 modbus_registeraddr_end = db.Column(db.String(255)) # 結束位置 # 讀取暫存器個數 modbus_read_start = db.Column(db.String(255)) # 起始位 modbus_read_end = db.Column(db.String(255)) # 結束位置 # 包含crc校驗碼的計算完成的資料 modbus_crccheck = db.Column(db.String(255)) def toDict(self): paramTemplateitem = ParamTemplateItem.query.get(self.paramtemplateid) return dict( gateway_id = self.gateway_id, name = self.name, bodyid = self.body, modbus_busadd = self.modbus_busadd, modbus_featurescode = self.modbus_featurescode, modbus_registeraddr_start = self.modbus_registeraddr_start, modbus_registeraddr_end = self.modbus_registeraddr_end, modbus_read_start = self.modbus_read_start, modbus_read_end = self.modbus_read_end, modbus_crccheck = self.modbus_crccheck, paramtemplateid = self.paramtemplateid, paramtemplate = paramTemplateitem.toDict() if paramTemplateitem else {}, **self._base() ) def _toModbusRTU(self): return dict( busadd = self.modbus_busadd, featurescode = self.modbus_featurescode, registeraddr_start = self.modbus_registeraddr_start, registeraddr_end = self.modbus_registeraddr_end, read_start = self.modbus_read_start, read_end = self.modbus_read_end, crccheck = self.modbus_crccheck, )
在獲取到了串口資料后回圈所有的資料 對串口引數進行組合 并且通過ModbusRTUIO()方法 把指令發送給對應的網關設備 并等待串口回傳資料
def ModbusRTUIO(client_sock, equipmentId, msg): """單次指令IO操作""" # print() print("[ModbusRTU-IO]") # print("發出指令:",msg) try: client_sock.send(msg) except Exception as e: print(e) # print("0") return False, {} # 發送后等到1秒在監聽接收接收 time.sleep(1) # try: while True: data = client_sock.recv(1024) # print("接收到指令回傳的結果:",data) if not data or data == "": return False, {} return True, { "equipmentid":equipmentId, "serialization": ModbusTcpSerialization(data) }以下是生成CRC校驗碼的方法 和 切割回傳的串口資料的方法
def GeneratorMsg(data, crc): """生成串口訊息 并自動組合CRC校驗碼 轉為16進制回傳""" # crc = calc_crc16(str, 6) a = '%04x' % (int(crc)) # print(a,bytearray.fromhex(a)) # print(bytearray.fromhex(str + a)) # print("高低位計算結果",a) return bytearray.fromhex(data + a) def ModbusTcpSerialization(data): return { "busadd":data[0], "featurescode": data[1], "effectivebit": data[2], "bit0": data[3], "bit2": data[4], "crch": data[5], "crcl": data[6], "data":data[3]*256+data[4] }
平臺開發
主體 = 用戶, 網關 = DTU 智能網關設備, 設備 = 傳感器

class Gateway(BaseModel, db.Model):
"""網關"""
__tablename__ = 'gateway'
principal_id = db.Column(db.Integer) # 主體ID
name = db.Column(db.String(255)) # 網關設備名
accesskey = db.Column(db.Text) # 請求許可證
accesskeyhex = db.Column(db.Text) # 請求許可證16進制
def toDict(self):
return dict(
name = self.name,
principal_id = self.principal_id,
accesskey = self.accesskey,
hex = self.accesskeyhex,
**self._base()
)
平臺和TCPserver之間的通訊介面
TCPserver
import requests
from Config import ResultServer
import json
def AuthGateway(accesskey):
print("[SDK.py][AuthGateway]=>校驗注冊包",accesskey)
request_url = ResultServer + '/iot/gateway/auth'
headers = {
"Content-Type": "application/json"
}
data = {
'accesskey':accesskey
}
result = requests.post(request_url, json=data, headers=headers)
if result.status_code == 200:
jsondata = result.json()
if jsondata["code"] == 200:
# print(jsondata)
return True, jsondata["data"]['gatewayID']
else:
return False, None
return False, None
def GatAllGatewayInstructions(gatewayID):
"""獲取該網關全部指令"""
print("[SDK.py][GatAllGatewayInstructions]=>獲取該網關全部指令","GatewayID: ",gatewayID)
request_url = ResultServer + '/iot/gateway/get/instructions'
headers = {
"Content-Type": "application/json"
}
data = {
'gatewayID':gatewayID
}
result = requests.post(request_url, json=data, headers=headers)
if result.status_code == 200:
jsondata = result.json()
if jsondata["code"] == 200:
# print(jsondata)
return True, jsondata
else:
return False, None
return False, None
def PushData(data):
"""上報資料"""
print("[SDK.py][PushData]=>上報資料")
request_url = ResultServer + '/iot/push/data'
headers = {
"Content-Type": "application/json"
}
data = data
for i in data:
print(i)
result = requests.post(request_url, json=data, headers=headers)
print("request status : ", result, result.status_code)
if result.status_code == 200:
jsondata = result.json()
if jsondata["code"] == 200:
print("api status:", jsondata)
return True, jsondata
else:
return False, None
return False, None
平臺介面
from app.Models import Gateway, Equipment, Collection
from app.Extensions import db
def GatAllGatewayInstructions(request):
print("[取出指令]")
gatewayID = request.get('gatewayID',None)
print(gatewayID)
gateway = Gateway.query.get(gatewayID)
if not gateway:
return 9000, "網關不存在", {}
equipment = Equipment.query.filter(Equipment.gateway_id == gateway.id).all()
instructions = []
for i in equipment:
instructions.append({
"equipmentId": i.id,
"data": i._toModbusRTU()
})
return 200, "", instructions
def GatewayAuth(request):
print("[核驗連接的設備]")
accesskey = request.get('accesskey',None)
if not accesskey:
return 403, "引數有誤", {}
gateway = Gateway.query.filter(Gateway.accesskey == str(accesskey)).first()
if not gateway:
return 9000, "網關不存在", {}
return 200, "驗證成功", {
"gatewayID": gateway.id
}
def PushData(request):
print("[接收資料]")
for i in request:
print(i)
db.session.execute(
# Table_name為表名
Collection.__table__.insert(),
# 串列生成式,包含大量的字典
[{'equipment_id' : i['equipmentid'], 'data' : i['serialization']['data'], 'modbustcpdata': i['serialization']} for i in request],
)
db.session.commit()
return 200, "", {}
做夢都沒想到我一個程式員有下工地當苦力的一天
考慮到東西要長期在室外暴露
我在網上買了一個電箱 用來安裝設備
為了安全起見還是弄一套N+1P的漏開
開始安裝



淘寶上買的戶外防水接線盒
rs485接線

正常運行半年了 目前一切正常

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/280271.html
標籤:其他
下一篇:區間DP(石子合并及同類題)







