使用邊緣計算網關分析CAN報文
- 硬體介紹
- 傳感器資訊收集
- 使用python處理CAN 報文
- 將獲取到的CAN資料通過UDP協議發送到PC端實作資料可視化
- 最終結果
介紹一個使用邊緣網關分析CAN報文,并將其進行資料可視化的應用案例,
硬體介紹
在此應用案例中,使用的傳感器是一個加速度傳感器,可以輸出x、y、z軸三周的加速度, 傳感器的通信協議是CAN Open,
使用的網關是虹科Dynagate 10-12
傳感器資訊收集
首先,使用網關Dynagate 10-12 收集CAN報文,在此之前,需要設定好網關的CAN介面,這里開放我們的can0介面,并且設定好波特率,與我們的傳感器匹配,本次案例的傳感器波特率是 1000Mpbs
ip link set can0 down
ip link set can0 type can bitrate 1000000
ip link set can0 up
設定好之后,使用命令獲取can報文
candump can0
就會得到原始的 raw_data

首先是CAN id, 其次是dlc, 最后是CAN報文內容,
使用python處理CAN 報文
有了 raw data, 還需要有DBC檔案對CAN報文進行決議,我們這里使用python 匯入dbc檔案,并且進行決議
import datetime
# 匯入兩個python處理can總線的包
import can
import cantools
# dbc檔案路徑
DBC_DIR = "/home/hkaco/demo.dbc"
# 加載dbc檔案 并創建can總線實體
db = cantools.database.load_file(DBC_DIR)
can_bus = can.interface.Bus('can0', bustype = 'socketcan')
def main():
while (1):
# 回圈獲取can總線的資料
msg = can_bus.recv()
# 獲取訊息并解碼
recv = db.decode_message(msg.arbitration_id, msg.data)
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(timestamp)
print(recv)
if __name__ == "__main__":
main()
通過我們的決議,得到的結果如下圖所示:

可以看到,我們成功的使用dbc檔案把獲取到的raw data決議成了我們能夠理解的CAN報文,
將獲取到的CAN資料通過UDP協議發送到PC端實作資料可視化
由于邊緣網關通常是不帶顯示設備的,所以我們如果想要直觀的看到這個資料產生的結果,可以通過udp協議將網關決議的結果發送到pc端,然后pc端使用matplotlib等資料處理、資料可視化庫即可完成,
那么首先是在網關中寫好發送的腳本,我們只需要在剛剛的腳本中,添加udp發送的相關代碼即可,
import socket
import json
import time
# 匯入兩個python處理can總線的包
import can
import cantools
# dbc檔案路徑
DBC_DIR = "/home/hkaco/demo.dbc"
# 目標ip地址和目標埠,ip地址
DEST_IP = "192.168.189.201"
DEST_PORT = 6789
# 定義一個通過udp協議發送函式
def send_msg(udp_socket, message):
udp_socket.sendto(message, (DEST_IP, DEST_PORT))
# 加載dbc檔案 并創建can總線實體
db = cantools.database.load_file(DBC_DIR)
can_bus = can.interface.Bus('can0', bustype = 'socketcan')
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while (1):
# 回圈獲取can總線的資料
msg = can_bus.recv()
# 獲取訊息并解碼
recv = db.decode_message(msg.arbitration_id, msg.data)
# 專為字典格式
recv = json.dumps(recv)
send_msg(udp_socket, bytes(recv, encoding="utf-8"))
time.sleep(0.5)
udp_socket.close()
if __name__ == "__main__":
main()
發送端寫好了之后,我們在PC端寫好還要寫好接收端代碼,并完善資料可視化代碼
import socket
import matplotlib.pyplot as plt
import time
import datetime
import json
from datetime import datetime
def recv_msg(udp_socket):
msg = udp_socket.recvfrom(1024)
# 解碼
recv_ip = msg[1]
recv_message = msg[0].decode('utf-8')
recv_dict = json.loads(recv_message)
# recv_dict是字典,可以使用鍵取值
x = recv_dict['Acc_X']
y = recv_dict['Acc_Y']
z = recv_dict['Acc_Z']
t = recv_dict['T']
return x, y, z, t
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(('', 6789))
font1 = {'family' : 'Times New Roman',
'weight' : 'normal',
'size' : 9,
}
plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False
plt.ion()
timel = []
xl = []
yl = []
zl = []
tl = []
while True:
nowtime = datetime.utcnow().strftime("%H:%M:%S")
x, y, z, t = recv_msg(udp_socket)
xl.append(x)
yl.append(y)
zl.append(z)
tl.append(t)
timel.append(nowtime)
plt.clf()
plt.subplots_adjust(hspace=0.5)
ax1 = plt.subplot(211)
ax1.plot(timel, xl, c="b", label='x')
ax1.plot(timel, yl, c="r", label='y')
ax1.plot(timel, zl, c= "y", label='z')
plt.legend(loc='upper right', prop=font1, frameon=False)
plt.xticks(rotation = 270)
plt.xlabel('時間戳')
plt.ylabel('加速度值')
ax2 = plt.subplot(212, sharex = ax1)
ax2.plot(timel, tl, c="c", label='T')
plt.xticks(rotation = 270)
plt.xlabel('時間戳')
plt.ylabel('T')
plt.legend(loc='upper right', prop=font1, frameon=False)
plt.pause(0.001)
plt.ioff()
socket.close()
if __name__ == "__main__":
main()
最終結果
首先運行接收端,然后在我們的邊緣網關Dynagate 10-12中運行發送端,結果如圖所示:



轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/249911.html
標籤:python
