本文記錄如何在施耐德 EAE 開法環境下,如何構建一個 MQTT 功能塊,通過外部的一個Soft Gateway 實作MQTT 通信,
概述
任何一個控制系統的開發提供的功能塊或者程式庫都是有限的,在實際應用中經常會感覺少了一些庫和功能塊,開放性系統的好處在于提供了一整套工具,讓用戶,或者第三方開發者參與開發功能塊和支撐服務,基于IEC61499 功能塊技術的施耐德EAE 就是這樣的系統,用戶可以撰寫自己的功能塊,EAE 撰寫功能塊特性如下:
用戶可撰寫的功能塊
- IEC61499基本功能塊(BASIC FUNCTION BLOCK)
- 復合功能塊(Conposite Function Block)
- 子應用 (Sub Application)
用戶使用IEC61131-3 的ST語言(Structured Text) 撰寫功能塊內部的演算法,
用戶撰寫的功能塊可以在EAE 中直接編譯,下載到運行時中運行,
用戶不需要任何干預,使用起來比較方便,至于編譯是在EAE 中完成的,還是在運行時中完成的,目前不得而知,
MQTT 網關實作
施耐德EAE 沒有提供MQTT 協議的功能塊,我們嘗試使用EAE 現有的功能塊和撰寫功能塊的方法,自己來構建一個MQTT 軟體網關(Soft Gateway),具體的思路如下
使用已有的NETIO 功能塊實作與外部程式的TCP 通信,外部程式(Soft Gateway)將TCP 資料轉化成為 MQTT 訊息發布,另外一個外部程式(GatewayClient訂閱該主題的訊息,
為了實作IEC61499 功能塊網中的資料打包成TCP 資料塊,我們撰寫了一個基本功能塊(package),整個實驗的結構如下:

MQTT 代理使用了公網上的一個公開的代理 broker.emqx.io ,你也可以在自己的電腦上安裝一個Mosqquitto MQTT Broker,Soft Gateway和MQTT Client 都是為了本次測驗臨時撰寫的,使用Go 語言撰寫,
功能塊package 的設計
功能塊package 是一個IEC61499 基本功能塊,實作將MQTT Topic 和counter 資料組成一個TCP 包,由于NETIO 是完成資料的透明TCP 傳輸,沒有定義更高級的資料格式,我們在這里簡單地使用IEC61499 標準推薦的ASN.1 資料格式,有關ASN.1 的更多資訊可以參閱我的博文-關于IEC61499 的資料交換資訊抽象語法ASN.1
建立基本功能塊
在EAE 左邊專案樹中選擇Basic 擊右鍵,選擇 New Item

出現如下畫面,添加一個topic 和VAL1 輸入兩個資料輸入腳,點擊REQ 的With 框,選擇將這兩個輸入與REQ 關聯,類似方式添加OUT和OUT_LEN 兩個資料 輸出,并且與CNF 關聯

REQ 的演算法(ST)
點擊作業區中的Algorithms,選擇REQ 事件的演算法,使用ST語言撰寫,

ALGORITHM REQ IN ST:
(* Add your comment (as per IEC 61131-3) here
Normally executed algorithm
*)
VAR
BUF:ARRAY [64] OF BYTE;
INDEX:UINT;
i:INT;
n:DINT;
END_VAR
INDEX:=0;
n:=LEN(TOPIC);
BUF[INDEX]:=86;
INDEX:=INDEX+1;
BUF[INDEX]:=0;
INDEX:=INDEX+1;
BUF[INDEX]:=DINT_TO_BYTE(n);
INDEX:=INDEX+1;
i:=1;
REPEAT
BUF[INDEX]:= CHAR_TO_BYTE(TOPIC[i]);
INDEX:=INDEX+1;
i:=i+1;
UNTIL i > n
END_REPEAT;
BUF[INDEX]:=70;
INDEX:=INDEX+1;
BUF[INDEX]:=UINT_TO_BYTE(VAL1/256);
INDEX:=INDEX+1;
BUF[INDEX]:=UINT_TO_BYTE(VAL1);
INDEX:=INDEX+1;
OUT_LEN:=INDEX;
i:=0;
OUT:='';
REPEAT
OUT:=CONCAT (OUT,CHAR_TO_STRING(BYTE_TO_CHAR(BUF[i])));
i:=i+1;
UNTIL i>=OUT_LEN
END_REPEAT;
END_ALGORITHM
這個演算法實作在OUT 資料中輸出ASN.1 格式的字串,有兩個ASN.1 資料項,一個是STRING型別的Topic ,另一個是UINT 型別的Counter,
SoftGateway.go
package main
import (
"fmt"
"net"
"os"
// "strings"
"strconv"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
CONN_HOST = "localhost"
CONN_PORT = "9201"
CONN_TYPE = "tcp"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected to MQTT Broker")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//subscribe(client,"dPACWrite")
// Listen for incoming connections.
l, err := net.Listen(CONN_TYPE, CONN_HOST+":"+CONN_PORT)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
// Close the listener when the application closes.
defer l.Close()
fmt.Println("Listening on " + CONN_HOST + ":" + CONN_PORT)
for {
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
}
// Handle connections in a new goroutine.
go handleRequest(conn,client)
}
}
// Handles incoming requests.
func handleRequest(conn net.Conn ,client mqtt.Client) {
// Make a buffer to hold incoming data.
buf := make([]byte, 1024)
//index:=0
for {
// Read the incoming connection into the buffer.
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err.Error())
}
fmt.Printf("message len=%d\n",cnt)
// line := strings.TrimSpace(string(buf[0:cnt]))
// fmt.Println(line)
len:=int(buf[1]<<8)|int(buf[2])
fmt.Printf("topic len=%d",len)
topicArray := make([]byte, len)
for index:=0;index<len;index++ {
topicArray[index]=buf[index+3]
}
topic:=string(topicArray)
fmt.Println(topic)
counter:=int(buf[len+4]<<8)|int(buf[len+5])
fmt.Printf("Counter=%d\n",counter)
client.Publish(topic, 0, false, strconv.Itoa(counter))
}
conn.Close()
}
MQTTClient.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("Gateway_client")
opts.SetUsername("client")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
topic:="dPACWrite"
token:=client.Subscribe("dPACWrite", 0, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s\n", topic)
for {
time.Sleep(time.Second)
}
}
運行
EAE 的Soft dPAC 在windows 下運行,而SoftGateway 和MQTTClient 在同一臺PC 中windows 10 下的ubuntu wsl 下運行,
本例子涉及的內容比較多,有問題,就添加在評論區中吧,如果沒有特別明白,也沒有關系,可以自己先嘗試建立一些小的基本功能塊,學習復雜系統最好的方法就是嘗試,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/240456.html
標籤:其他
