本文章為原創,轉載請注明出處!
登錄平臺:IOTOS?愛投斯物聯中臺
賬號:iotos_test 密碼:iotos123
代碼地址:IOTOSDK-Python: IOTOS Python版本SDK,自帶原生介面和采集引擎 (gitee.com)
目錄
前言
驅動目的
適用范圍
驅動代碼
驅動決議
使用示例
-
前言
Modbus協議是應用于電子控制器上的一種通用語言,通過此協議,控制器相互之間、控制器經由網路例如以太網)和其它設備之間可以通信,它已經成為一通用工業標準,有了它,不同廠商生產的控制設備可以連成工業網路,進行集中監控此協議定義了一個控制器能認識使用的訊息結構,而不管它們是經過何種網路進行通信的,它描述了一控制器請求訪問其它設備的程序,如果回應來自其它設備的請求,以及怎樣偵測錯誤并記錄,它制定了訊息域格局和內容的公共格式,
Modbus具有兩種串行傳輸模式:分別為ASCII和RTU,Modbus是一種單主站的主從通信模式,Modbus網路上只能有一個主站存在,主站在Modbus網路上沒有地址,每個從站必須有唯一的地址,從站的地址范圍為0 - 247,其中0為廣播地址,從站的實際地址范圍為1 - 247,
Modbus RTU通信以主從的方式進行資料傳輸,在傳輸的程序中Modbus RTU主站是主動方,即主站發送資料請求報文到從站,Modbus RTU從站回傳回應報文,
-
驅動目的
modbus RTU 該驅動是將中臺(愛投斯物聯中臺)作為服務端(上位機)向客戶端(下位機)/modbus 485通訊的前端設備發送詢問幀,客戶端(下位機)/modbus 485通訊的前端設備接收到詢問幀并回傳應答幀給到服務端(上位機)進行決議作業并展示資料
-
適用范圍
凡是走標準的modbus_rtu協議的設備,例如煙感、光感、PLC等,需要注意的是,若客戶端(下位機)/modbus 485通訊的前端設備只有485/232 等的串口通訊不具備上網功能,需增加一個外接模塊(如485 轉4g /485 轉wifi 模塊)與服務器(上位機進行網路通訊),使用者或用戶可在生產生活中使用串口通訊測驗軟體測驗設備是否通訊正常,
-
使用示例
- 以光感傳感器(威海晶合數字礦山技術有限公司的光照傳感器)為示例進行演示

- 首先,連接光感和485模塊,光感的485線的A,B級分別連接模塊的485介面AB級;其次光感的電源線連接符合光感正常運轉的電源(可用變壓器控制電源大小),然后模塊網口連接交換機,最后分別給模塊和光感通上電,具體連接方式如下圖:

- 進入模塊的IP,進行配置,將模式改為TCP Client格式,地址改為愛投斯中臺的IP地址(121.36.152.36),埠改為為被分配的埠,注冊包方式改為云轉發

- 進入愛投斯中臺,賬號為iotos_test,密碼為iotos123,創建網關

- 填好網關名稱后點擊確認

- 創建設備示例點擊【我的設備】 -> 【通信網關】 -> 【設備實體】

- 填寫【設備名稱】、選擇剛剛創建的【模板驅動】和【通信網關】,引數tcp

- 創建資料點,點擊【我的設備】 -> 【通信網關】 -> 【設備實體】 -> 【資料點】,并在【設備實體】下拉串列選擇剛剛創建的設備實體

點擊右邊的創建資料點,填寫名稱
并在高級配置中配置需要給光感發送的指令,以下為光感的詢問幀和中臺的配置:

中臺配置:

- 在【我的設備】 -> 【通信網關】中找到剛才創建的網關,點擊【高級】

- 開啟云網關,密碼為賬號密碼

- 點擊 【我的設備】 -> 【通信網關】 -> 【設備實體】->【資料點】,選擇剛才創建的設備實體

- 即可查看資料已經上報成功,light即為此時的光照強度

-
驅動代碼
#!coding:utf8
import json
import sys
sys.path.append("..")
from driver import *
import time
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusInvalidResponseError
import serial
import signal
import traceback
from jcomm import *
import re
import struct
import math
#硬體心跳執行緒
class RunHardwareHeartbeatThread(threading.Thread,JLib):
def __init__(self, driver):
threading.Thread.__init__(self)
JLib.__init__(self)
self.driver = driver
def run(self):
statetmp = False
dataIdTmp = ''
recycletmp = 0
for dataId,attrs in self.driver.data2attrs.items():
if 'param' not in attrs['config']:
self.error(attrs['config'])
break
if 'hbt' in attrs['config']['param']:
dataIdTmp = dataId
recycletmp = attrs['config']['param']['hbt']
break
while True:
try:
if not self.driver.startHeartbeat:
return
#狀態反轉及延時
if statetmp == False:
statetmp = True
else:
statetmp = False
time.sleep(recycletmp)
# self.warn('HARDWARE HEATBEAT ' + dataIdTmp + u'硬體心跳:' + str(statetmp))
#控制執行
rettmp = ''
if statetmp:
rettmp = self.driver.Event_setData(dataIdTmp,'true')
else:
rettmp = self.driver.Event_setData(dataIdTmp,'false')
if json.loads(rettmp)["code"] == 0:
self.driver.setValue(self.driver.name(dataIdTmp), statetmp)
except Exception,e:
traceback.print_exc(e.message)
continue
class ModbusDriver(IOTOSDriverI):
def __init__(self):
IOTOSDriverI.__init__(self)
self.master = None
# 心跳開關
self.startHeartbeat = False
self.bitsState = [0,0,0,0,0,0,0,0]
self.sourceDataIn = []
# 1、通信初始化
def InitComm(self, attrs = None):
try:
#一、tcp埠監聽
self.__port = self.sysAttrs['config']['param']['tcp']
self.__tcpServer = TcpServerThread(self,self.__port)
self.__tcpServer.setDaemon(True)
self.__tcpServer.start()
self.debug(self.sysAttrs['name'] + u' TCP埠' + str(self.__port) + u"已啟動監聽!")
#二、創建串口1 <=> 串口2
serialtmp = self.sysAttrs['config']['param']['serial']
self.__serial = SerialDtu(serialtmp)
self.__serial.setCallback(self.serialCallback)
self.__serial.open()
#三、串口1 <=> modbus_tk
self.master = modbus_rtu.RtuMaster(self.__serial.serial)
self.master.set_timeout(5)
self.master.set_verbose(False)
self.debug(self.sysAttrs['name'] + u' 串口' + self.__serial.portName() + u'已打開!')
self.zm.pauseCollect = True
# 實體化硬體心跳執行緒
RunHardwareHeartbeatThread(self).start()
except Exception,e:
self.online(False)
traceback.print_exc(u'通信初始化失敗' + e.message)
#四、串口2 <=> tcp
#tcp => 串口2
def tcpCallback(self,data):
datastr = self.str2hex(data)
self.sourceDataIn = data
self.info("Master < < < < < < Device: " + datastr)
self.__serial.send(data)
#tcp <= 串口2
def serialCallback(self,data):
self.info("Master > > > > > > Device: " + self.str2hex(data))
self.__tcpServer.send(data)
#連接狀態回呼
def connectEvent(self,state):
self.online(state)
try:
if state == True:
self.warn('連接成功,啟動采集、心跳')
self.pauseCollect = False
#啟動軟體看門狗
self.startHeartbeat = True
else:
self.warn('連接斷開,將關閉采集和心跳!')
self.startHeartbeat = False
self.pauseCollect = True
except Exception,e:
self.error(u'硬體心跳錯誤, ' + e.message)
# 2、采集
def Collecting(self, dataId):
try:
rtu_ret = ()
cfgtmp = self.data2attrs[dataId]['config']
#added by lrq,過濾非modbus rtu配置的點
if not cfgtmp.has_key('param') or not cfgtmp.has_key('proxy'):
return ()
#當是新一組功能號時;當沒有proxy.pointer,或者有,但是值為null時,就進行采集!否則(有pointer且值不為null,表明設定了采集代理,那么自己自然就被略過了,因為被代理了)當前資料點遍歷輪詢會被略過!
if 'pointer' not in cfgtmp['proxy'] or cfgtmp['proxy']['pointer'] == None or cfgtmp['proxy']['pointer'] == '':
#added by lrq,某些過濾掉不采集,因為有的地址的設備不在線,只要在proxy下面配置disabled:true,這樣就不會輪訓到它!
if 'disabled' in cfgtmp['proxy'] and cfgtmp['proxy']['disabled'] == True:
return ()
else:
self.warn(self.name(dataId))
# added by lrq,過濾非modbus rtu配置的點
if not cfgtmp['param'].has_key('funid'):
return ()
funid = cfgtmp['param']['funid']
devid = cfgtmp['param']['devid']
regad = cfgtmp['param']['regad']
format = cfgtmp['param']['format']
quantity = re.findall(r"\d+\.?\d*", format)
if len(quantity):
quantity = int(quantity[0])
else:
quantity = 1
if format.lower().find('i') != -1: #I、i型別資料為4個位元組,所以n個資料,就是4n位元組,除一般應對modbus標準協議的2位元組一個資料的個數單位!
quantity *= 4/2
elif format.lower().find('h') != -1:
quantity *= 2/2
elif format.lower().find('b') != -1:
quantity *= 1/2
elif format.find('d') != -1:
quantity *= 8/2
elif format.find('f') != -1:
quantity *= 4/2
elif format.find('?') != -1: #對于功能號1、2的開關量讀,開關個數,對于這種bool開關型,個數就不是回傳位元組數的兩倍了!回傳的位元組個數是動態的,要位元組數對應的位數總和,能覆寫傳入的個數數值!
quantity *= 1
format = '' #實踐發現,對于bool開關型,傳入開關量個數就行,format保留為空!如果format設定為 "?"或"8?"、">?"等,都會決議不正確!!
self.debug('>>>>>>' + '(PORT-' + str(self.__port) + ')' + str(devid) + ' ' + str(funid) + ' ' + str(regad) + ' ' + str(quantity) + ' ' + str(format))
rtu_ret = self.master.execute(devid, funid, regad, quantity,data_format=format)
if funid == 3:
retlist = []
for i in range(len(rtu_ret)):
retlist.append(rtu_ret[i])
rtu_ret = tuple(retlist)
#周期查詢的開關量輸出狀態進行備份,用來給控制用
if funid == 1:
self.bitsState = list(rtu_ret)
self.debug(rtu_ret)
return rtu_ret
# 一組功能號內的資料點,不進行遍歷采集!跳過!
else:
return () #注意,這種情況下不是采集錯誤,如果回傳None,那么會當作采集錯誤處理,進行采集錯誤計數了!!
except ModbusInvalidResponseError, e:
self.error(u'MODBUS回應超時, ' + e.message)
return None
except Exception, e:
traceback.print_exc(e.message)
self.error(u'采集決議引數錯誤:' + e.message)
return None
# 3、控制 資料點配置
# 事件回呼介面,監測點操作訪問
def Event_getData(self, dataId, condition=''):
return json.dumps({'code': 0, 'msg': '', 'data': new_val})
# 事件回呼介面,監測點操作訪問
def Event_setData(self, dataId, value):
self.warn(value)
try:
if self.master == None:
self.InitComm()
data_config = self.data2attrs[dataId]['config']
bit = 0
if 'proxy' in data_config.keys() and 'pointer' in data_config['proxy'] and data_config['proxy']['pointer'] != None:
bit = data_config['proxy']['index']
if self.valueTyped(dataId,value) == True:
self.bitsState[bit] = 1
else:
self.bitsState[bit] = 0
self.warn(self.bitsState)
#注意,這里地址是1,但是再huaihua等用了3合一設備的,地址是2,接下來需要這里也做個區分,按照當前操作的資料點對應的實際資料點來!
ret = self.master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=self.bitsState)
self.warn(ret)
return json.dumps({'code': 0, 'msg': u'操作成功!', 'data': list(ret)})
except Exception,e:
return json.dumps({'code': 501, 'msg': u'操作失敗,錯誤碼501,' + e.message, 'data': None})
-
驅動決議
- 撰寫環境為python2,首先需要匯入modbus、資料決議和愛投斯中臺驅動檔案(driver)的相關包
#!coding:utf8
import json
import sys
sys.path.append("..")
from driver import *
import time
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusInvalidResponseError
import serial
import signal
import traceback
from jcomm import *
import re
import struct
import math
- 創建硬體心跳行程,判斷中臺的資料點是否含有必要的屬性,如果沒有則提示error,防止后續程序報錯,開啟心跳行程則啟動中臺的通訊
class RunHardwareHeartbeatThread(threading.Thread,JLib):
def __init__(self, driver):
threading.Thread.__init__(self)
JLib.__init__(self)
self.driver = driver
def run(self):
statetmp = False
dataIdTmp = ''
recycletmp = 0
for dataId,attrs in self.driver.data2attrs.items():
if 'param' not in attrs['config']:
self.error(attrs['config'])
break
if 'hbt' in attrs['config']['param']:
dataIdTmp = dataId
recycletmp = attrs['config']['param']['hbt']
break
while True:
try:
if not self.driver.startHeartbeat:
return
#狀態反轉及延時
if statetmp == False:
statetmp = True
else:
statetmp = False
time.sleep(recycletmp)
# self.warn('HARDWARE HEATBEAT ' + dataIdTmp + u'硬體心跳:' + str(statetmp))
#控制執行
rettmp = ''
if statetmp:
rettmp = self.driver.Event_setData(dataIdTmp,'true')
else:
rettmp = self.driver.Event_setData(dataIdTmp,'false')
if json.loads(rettmp)["code"] == 0:
self.driver.setValue(self.driver.name(dataIdTmp), statetmp)
except Exception,e:
traceback.print_exc(e.message)
continue
- 繼承IOTOSDriverI類,進行初始化,設定心跳開關
class ModbusDriver(IOTOSDriverI):
def __init__(self):
IOTOSDriverI.__init__(self)
self.master = None
# 心跳開關
self.startHeartbeat = False
self.bitsState = [0,0,0,0,0,0,0,0]
self.sourceDataIn = []
- 進行通訊初始化,獲取愛投斯中臺設備實體中配置的埠和serial屬性并且啟動tcp監聽,實體化心跳行程
# 1、通信初始化
def InitComm(self, attrs = None):
try:
#一、tcp埠監聽
self.__port = self.sysAttrs['config']['param']['tcp']
self.__tcpServer = TcpServerThread(self,self.__port)
self.__tcpServer.setDaemon(True)
self.__tcpServer.start()
self.debug(self.sysAttrs['name'] + u' TCP埠' + str(self.__port) + u"已啟動監聽!")
#二、創建串口1 <=> 串口2
serialtmp = self.sysAttrs['config']['param']['serial']
self.__serial = SerialDtu(serialtmp)
self.__serial.setCallback(self.serialCallback)
self.__serial.open()
#三、串口1 <=> modbus_tk
self.master = modbus_rtu.RtuMaster(self.__serial.serial)
self.master.set_timeout(5)
self.master.set_verbose(False)
self.debug(self.sysAttrs['name'] + u' 串口' + self.__serial.portName() + u'已打開!')
self.zm.pauseCollect = True
# 實體化硬體心跳執行緒
RunHardwareHeartbeatThread(self).start()
except Exception,e:
self.online(False)
traceback.print_exc(u'通信初始化失敗' + e.message)
- tcp回呼,可以查看設備是否與中臺以及連接成功
#四、串口2 <=> tcp
#tcp => 串口2
def tcpCallback(self,data):
datastr = self.str2hex(data)
self.sourceDataIn = data
self.info("Master < < < < < < Device: " + datastr)
self.__serial.send(data)
#tcp <= 串口2
def serialCallback(self,data):
self.info("Master > > > > > > Device: " + self.str2hex(data))
self.__tcpServer.send(data)
- 連接狀態回呼,連接成功則啟動硬體心跳行程并且設定中臺的網關狀態
#連接狀態回呼
def connectEvent(self,state):
self.online(state)
try:
if state == True:
self.warn('連接成功,啟動采集、心跳')
self.pauseCollect = False
#啟動軟體看門狗
self.startHeartbeat = True
else:
self.warn('連接斷開,將關閉采集和心跳!')
self.startHeartbeat = False
self.pauseCollect = True
except Exception,e:
self.error(u'硬體心跳錯誤, ' + e.message)
- 最后是采集函式,先過濾掉中臺非modbus rtu配置的點,再拿到資料點屬性中的引數,將引數進行處理后可以拿到需要給設備發送的指令,發送過去后對接收過來的資料進行進制轉換和處理后再上傳至中臺即可將設備的資料上云
# 2、采集
def Collecting(self, dataId):
try:
rtu_ret = ()
cfgtmp = self.data2attrs[dataId]['config']
#added by lrq,過濾非modbus rtu配置的點
if not cfgtmp.has_key('param') or not cfgtmp.has_key('proxy'):
return ()
#當是新一組功能號時;當沒有proxy.pointer,或者有,但是值為null時,就進行采集!否則(有pointer且值不為null,表明設定了采集代理,那么自己自然就被略過了,因為被代理了)當前資料點遍歷輪詢會被略過!
if 'pointer' not in cfgtmp['proxy'] or cfgtmp['proxy']['pointer'] == None or cfgtmp['proxy']['pointer'] == '':
#added by lrq,某些過濾掉不采集,因為有的地址的設備不在線,只要在proxy下面配置disabled:true,這樣就不會輪訓到它!
if 'disabled' in cfgtmp['proxy'] and cfgtmp['proxy']['disabled'] == True:
return ()
else:
self.warn(self.name(dataId))
# added by lrq,過濾非modbus rtu配置的點
if not cfgtmp['param'].has_key('funid'):
return ()
funid = cfgtmp['param']['funid']
devid = cfgtmp['param']['devid']
regad = cfgtmp['param']['regad']
format = cfgtmp['param']['format']
quantity = re.findall(r"\d+\.?\d*", format)
if len(quantity):
quantity = int(quantity[0])
else:
quantity = 1
if format.lower().find('i') != -1: #I、i型別資料為4個位元組,所以n個資料,就是4n位元組,除一般應對modbus標準協議的2位元組一個資料的個數單位!
quantity *= 4/2
elif format.lower().find('h') != -1:
quantity *= 2/2
elif format.lower().find('b') != -1:
quantity *= 1/2
elif format.find('d') != -1:
quantity *= 8/2
elif format.find('f') != -1:
quantity *= 4/2
elif format.find('?') != -1: #對于功能號1、2的開關量讀,開關個數,對于這種bool開關型,個數就不是回傳位元組數的兩倍了!回傳的位元組個數是動態的,要位元組數對應的位數總和,能覆寫傳入的個數數值!
quantity *= 1
format = '' #實踐發現,對于bool開關型,傳入開關量個數就行,format保留為空!如果format設定為 "?"或"8?"、">?"等,都會決議不正確!!
self.debug('>>>>>>' + '(PORT-' + str(self.__port) + ')' + str(devid) + ' ' + str(funid) + ' ' + str(regad) + ' ' + str(quantity) + ' ' + str(format))
rtu_ret = self.master.execute(devid, funid, regad, quantity,data_format=format)
if funid == 3:
retlist = []
for i in range(len(rtu_ret)):
retlist.append(rtu_ret[i])
rtu_ret = tuple(retlist)
#周期查詢的開關量輸出狀態進行備份,用來給控制用
if funid == 1:
self.bitsState = list(rtu_ret)
self.debug(rtu_ret)
return rtu_ret
# 一組功能號內的資料點,不進行遍歷采集!跳過!
else:
return () #注意,這種情況下不是采集錯誤,如果回傳None,那么會當作采集錯誤處理,進行采集錯誤計數了!!
except ModbusInvalidResponseError, e:
self.error(u'MODBUS回應超時, ' + e.message)
return None
except Exception, e:
traceback.print_exc(e.message)
self.error(u'采集決議引數錯誤:' + e.message)
return None
- 部分設備可以進行資料的下發來控制設備的狀態或者配置設備的引數,可以利用如下的函式
# 事件回呼介面,監測點操作訪問
def Event_setData(self, dataId, value):
self.warn(value)
try:
if self.master == None:
self.InitComm()
data_config = self.data2attrs[dataId]['config']
bit = 0
if 'proxy' in data_config.keys() and 'pointer' in data_config['proxy'] and data_config['proxy']['pointer'] != None:
bit = data_config['proxy']['index']
if self.valueTyped(dataId,value) == True:
self.bitsState[bit] = 1
else:
self.bitsState[bit] = 0
self.warn(self.bitsState)
#注意,這里地址是1,但是再huaihua等用了3合一設備的,地址是2,接下來需要這里也做個區分,按照當前操作的資料點對應的實際資料點來!
ret = self.master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=self.bitsState)
self.warn(ret)
return json.dumps({'code': 0, 'msg': u'操作成功!', 'data': list(ret)})
except Exception,e:
return json.dumps({'code': 501, 'msg': u'操作失敗,錯誤碼501,' + e.message, 'data': None})
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294302.html
標籤:其他
上一篇:Intel已四面楚歌,繼PC陣地失守后,服務器芯片陣地也被攻入
下一篇:1-Unity是什么
