github地址:https://github.com/wxzz/CSharpFlink
gitee地址:https://gitee.com/wxzz/CSharpFlink
1 概述及背景
我們有一個全國性質的面向工業的公有云平臺,通過專線或4G的鏈路方式實時向平臺傳輸資料,每天處理1億條左右的資料量,為現場用戶提供實時的在線服務和離線資料分析服務,現在已經上線穩定運行有將近3年的時間,同時也為工業企業提供私有云建設服務,
我們計劃使用Flink作為云平臺后臺的實時計算部分,基本實作資料點的聚合計算、運算式規則計算等業務,進一步實作機器學習或自定義復雜演算法的需求,
我們經過將近一年左右時間的研究及開發,已經基本實作了聚合和邏輯等業務,但是感覺Flink比較重,并且應用和運維的水平要求比較高,
基于上述情況,我們自主使用NET 5.0開發一套CSharpFlink實時計算組件,支持自定義資料源、計算和存盤的基本要求,
2 應用場景
主要面向物聯網、工業互聯網私有云或公有云平臺建設程序中的資料點實時聚合和運算式計算,應用場景包括:
(1)資料點的實時時間視窗范圍內聚合計算,例如:最大值、最小值、平均值、和值、眾數、方差、中位數等,可以自定義二次開發,
(2)資料點的歷史延遲視窗的一段時間范圍內資料補充或更新的重新計算,
(3)資料點的運算式計算,支持自定義C#腳本進行編輯,實時預警或資料深度加工處理,
(4)主從結構的分布式部署,主節點負責計算任務分發,作業節點負責任務計算及結果存盤,
3 框架特點
主要特點主要是根據我們多年的物聯網、工業專案經驗的提煉和總結,滿足實作應用場景,特點包括:
(1)使用最新的NET 5.0進行開發,完全跨平臺,
(2)實時資料視窗范圍外的資料補發或更新的重新計算,例如:當前5秒的實時資料視窗,支持5秒以前的資料補充和更新,并且進行重新計算及更新到資料存盤單元,
(3)實時資料運算式計算支持定時計算或資料值改事件變觸發計算,滿足實時運算式或周期性計算,
(4)C#語言的二次開發,對接多種資料源,自定義算子和多種方式資料存盤等,
(5)單節點或分布式部署,
4 框架結構
框架結構組件的基本示意,如下圖:
5 代碼目錄說明
使用VS2019進行工程開發,工程解決方案檔案為:CSharpFlink.sln,代碼目錄說明如下:
(1)Cache:主節點和作業節點計算任務本地快取管理,
(2)Calculate:計算任務輸入、程序、輸出操作及管理,
(3)Channel:主節點和作業節點分布式部署模式的IO通訊操作,
(4)Common:操作公眾類別庫,
(5)Config:全域組態檔操作,
(6)Execution:全域工程的執行環境入口,
(7)Expression:運算式計算任務操作,
(8)Log:日志操作及管理,
(9)Model:資料點元資料資訊,
(10)Node:主節點和作業節點管理,
(11)Protocol:主節點和作業節點之間分布式部署之間互動的協議,
(12)Sink:計算任務計算結果存盤介面,
(13)Source:對接多種資料源介面,例如:mqtt、kafka、rabbitmq、資料庫等,
(14)Task:視窗或表達任務介面,主節點和作業節點任務操作及管理,
(15)Window:資料視窗任務操作,
(16)Worker:作業節點介面,
6 組態檔說明
組態檔默認為:cfg\global.cfg,可以自定義指定組態檔,參見:命令列操作說明,組態檔說明,如下:
(1)MaxDegreeOfParallelism:任務并行度,主節點生成任務、作業節點處理任務依賴這個引數,
(2)MasterListenPort:主節點偵聽埠,用于作業節點主動連接,
(3)MasterIp:主節點IP,用于作業節點主動連接,
(4)NodeType:節點運行模式,包括:Master、Slave和Both,
(5)RemoteInvokeInterval:遠程呼叫作業節點間隔時間,單位:毫秒,
(6)RepeatRemoteInvokeInterval:呼叫作業節點失敗后,重新呼叫作業節點間隔時間,單位:毫秒,
(7)SlaveExcuteCalculateInterval:作業節點執行計算任務間隔時間,單位:毫秒,
(8)MaxFrameLength:主節點和作業節點之間傳輸資料最大資料偵,單位:位元組,
(9)WorkerPower:作業節點能力系數,大于1,會連續發送多個任務,
7 任務部署說明
二次開發參見:二次開發說明,開發好的任務,測驗通過后,把程式集(.dll)復制到“tasks”目錄下,例如工程TestTask專案測驗、編譯通過后,可以部署到“tasks”目錄下,運行“CSharpFlink”主程式會自動加載和呼叫,
可以自定義指定任務程式集,參見:命令列操作說明,
8 命令列操作說明
命令列運行“CSharpFlink”程式,支持自定義指定組態檔或任務程式集,說明如下:
-h 顯示命令列幫助,
-c 加載指定組態檔, 例如:CSharpFlink -c c:/my.cfg
-t 加載任務程式集, 例如:CSharpFlink -t c:/mytask.dll
例如:
dotnet CSharpFlink.dll -c c:/master.cfg -t c:/mytask.dll
9 部署說明
“release”目錄下是編譯好的程式,把“CSharpFlink v1.0”分別復制到不同的路徑下,分別修改“cfg\global.cfg”組態檔中“NodeType”引數為:Master和Slave,修改主節點程式“tasks\tasks.cfg”檔案中的任務數,分別運行不同目錄下的“dotnet CSharpFlink.dll”,
“TestTask.dll”源代碼,參見:二次開發說明,
10 二次開發說明
二次開發主要針對資料源、計算程序和資料計算結果存盤,大致程序如下:
(1) 資料源對接,可以自定義對接mqtt、kafka、rabbitmq、資料庫等,需要繼承SourceFunction介面,參見:RandomSourceFunction.cs類,
(2) 資料計算程序,可以自定義資料處理或加工,需要繼承Calculate.Calculate介面,參見:聚合計算Avg.cs、運算式計算ExpressionCalculate.cs,通過AddWindowTask或AddExpressionTask函式引數進行實體化,
(3) 資料計算結果存盤,可以自定義存盤任何介質上,需要繼承SinkFunction介面,參見:SinkFunction.cs類,
11 應用事例展示
同一臺電腦,CPU:4核 I5-7400 3.0GHz,記憶體:16G,1個主節點,5個作業節點,生成1000個資料點任務,隨機資料點時間視窗和計算算子,CPU使用率為:20%-30%,記憶體使用率:30%-40%,主節點CPU和記憶體使用情況:3%-5%、100MB-300MB, 作業節點CPU和記憶體使用情況:0.1%-2%、25MB-60MB,運行效果,如下圖:

物聯網&大資料技術 QQ群:54256083
物聯網&大資料合作 QQ群:727664080
網站:http://www.ineuos.net
聯系QQ:504547114
合作微信:wxzz0151
官方博客:https://www.cnblogs.com/lsjwq
iNeuOS工業互聯網作業系統 公眾號

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/222966.html
標籤:.NET技术
