物聯網海量設備心跳注冊,脫網清除——多執行緒高并發互斥鎖落地
目錄
- 物聯網海量設備心跳注冊,脫網清除——多執行緒高并發互斥鎖落地
- 1.應用背景
- 2.整體框架
- 2.1.心跳注冊框架
- 2.1.1.海量設備
- 2.1.2.心跳上報Handler流程
- 2.2.脫網清理框架
- 2.2.1.激活字典表清理脫網設備方法
- 2.2.2.脫網清理流程圖
- 2.1.心跳注冊框架
- 3.多執行緒與高并發說明
- 3.1.多執行緒說明
- 3.2.高并發說明
- 4.多執行緒高并發造成的例外現象
- 4.1.空參考
- 4.2.字典表里元素賦值不成功
- 4.3.統計設備總數不正確
- 5.分析例外原因
- 5.1.造成空參考的原因
- 5.2.設備IP賦值不成功原因
- 5.3.統計設備總數不正確原因
- 6.解決思路
- 7.代碼實作
- 8.小結
1.應用背景
在物聯網應用場景中,需要維護很多個設備的連接,比如基于TCP socket通信的長連接,目的是為了獲取設備采集的資訊,反向控制設備的數字開關或者模擬量,我們把這些TCP長連接都放入了基于執行緒安全的ConcurrentDictionary激活字典表中,IP地址作為key,設備箱領域模型作為value,我們需要把激活設備箱的字典表維護好,需要將超時沒有心跳的設備,我們可以稱之為脫網設備,給清理出激活字典表,寫入到脫網告警字典表中去,當脫網設備下次再有心跳時,可以再次移入到激活字典表中,從而再產生恢復告警,進行一系列其他動作,
2.整體框架
2.1.心跳注冊框架

2.1.1.海量設備
因為要模擬海量設備的TCP場景,我們利用模擬器生成了12000臺模擬設備,8臺真實設備,
2.1.2.心跳上報Handler流程
詳細心跳上報流程詳見上述框架圖
- 第一次建立TCP長連接,并且上報心跳報文;
- socket快取會先處理TCP中存在的粘包,具體方法可參見此篇博文 TCP粘包處理現象及其解決方案——基于NewLife.Net網路庫的管道式幀長粘包處理方法
- 然后會觸發OnReceive中的e事件,從而傳入粘包處理后的message;
- 判斷包有效性,因為這方面比較簡單,根據不同協議寫一個類來處理即可,這里不再展開;
- 包有效載荷的CRC判斷,具體實作可參見此篇博文 基于Modbus三種CRC16校驗方法的性能對比;
- 包型別決議(這里特指決議出心跳包);
- 心跳包決議,具體可參見這兩篇博文,深入淺出C#結構體——封裝以太網心跳包的結構為例, 類與結構體性能對比測驗——以封裝網路心跳包為例
- 最終將設備新增激活字典表(第一次心跳)或者在激活字典表重繪心跳時間(非第一次心跳),
突然發現我可以寫一個物聯網的采集系統的系列了,組織一個目錄,希望自己堅持下去吧,
2.2.脫網清理框架
2.2.1.激活字典表清理脫網設備方法
原理很簡單,遍歷字典表中超過設定的檢測周期,篩選到一個字典的IEnumerable中去,然后在激活字典表中洗掉對應超時key(這里就是指IP地址)即可,當然這里的_internal周期可以*N,多個周期,自行在組態檔中設定即可,組態檔如下:
"ipboxNumStaticInternal": 12
public static void DeleteDeadBoxFromActiveBox(in _internal)
{
{
var outTime = DateTime.Now.AddSeconds(-_internal);
var iboxTimeOutList = iboxActiveDictionary.Where(q => (outTime > q.Value.UpdateTime));//.Select(x=> iboxActiveDictionary[x.Key]) ;
foreach (var item in iboxTimeOutList)
{
iboxActiveDictionary.Remove(item.Key);
}
}
}
2.2.2.脫網清理流程圖

這里主要開啟了一個系統定時器,主動會去呼叫清理脫網設備方法,呼叫時間間隔即ipboxNumStaticInternal,代碼如下:
public void systemTimerStart()
{
var interval = ReadTheInternalFromSetting();
_systemTimer = new Timer(state =>
{
IBoxActiveDicManager.DeleteDeadBoxFromActiveBo(_internal);
Console.WriteLine("{1},激活設備數量:{0}\n",IBoxActiveDicManager.iboxActiveDictionary.Count,DateTime.Now);
}, null, interval, interval);
Console.WriteLine("PemsCom采集系統時鐘已經開啟");
LoggerHelper.Info("PemsCom采集系統時鐘已經開啟");
}
/// <summary>
/// 組態檔讀入時間間隔方法
/// </summary>
/// <returns></returns>
private int ReadTheInternalFromSetting()
{
_internal = int.Parse(Appsettings.app(new string[] {"ipboxNumStaticInternal" }));
Console.WriteLine("PemsCom采集系統時鐘配置引數已經讀");
LoggerHelper.Info("PemsCom采集系統時鐘配置引數已經讀");
return Convert.ToInt32(TimeSpan.FromSecond(_internal).TotalMilliseconds);
}
3.多執行緒與高并發說明
3.1.多執行緒說明
這里會有很多的執行緒讓CPU來輪片執行,比如:
- 12008個Receive事件觸發執行緒;
- 定時清除脫網設備執行緒;
- 主執行緒,監控命令列輸入,并執行對應的命令;
舉個實際的例子,以圖為證

12008臺設備,每秒處理接受網路包的峰峰值是9218個包,就是在某一秒,CPU共輪片執行了9218個執行緒,比如是雙核4執行緒的,則9218/4=2304.5,即CPU在1秒輪片執行了2305次,即0.43毫秒就輪片執行一次,
3.2.高并發說明
其實3.1已經解釋了高并發,在某一秒,需要處理的接收事件有接近1萬件,而這一時刻的執行順序是無序的,9218里的這么多執行緒,我們不知道哪個先執行,哪個后執行,如果不認為地加一些邏輯控制,比如我們今天要介紹的互斥鎖,就會出現一些例外現象,
4.多執行緒高并發造成的例外現象
這里只描述現象,原因會在下面5.分析例外原因 做具體描述,
4.1.空參考

例外所在的位置:心跳處理類如下,
public class HeartHandler
{
static string _deviceIndex = Appsettings.app(new string[] { "DeviceIndex" });
private static IBoxActive iboxActive;
public static void Register(TcpHeartPacket heartPacket,int sessId)
{
UInt32 IP;
UInt64 mac;
if (_deviceIndex == "IP")
{
IP =(UInt32)BitConverter.ToUInt32(heartPacket.IP, 0);
if (IBoxActiveDicManager.GetBoxActive(IP, out iboxActive) != true)
{
IBoxActiveDicManager.iboxActiveDictionary.TryAdd(IP, iboxActive);
iboxActive.SessID = sessId;
}
}
else
{
mac = (UInt64)BitConverter.ToUInt64(heartPacket.Mac, 0);
if (IBoxActiveDicManager.GetBoxActive(mac, out iboxActive) != true)
{
IBoxActiveDicManager.iboxActiveDictionary.TryAdd(mac, iboxActive);
iboxActive.SessID = sessId;
}
}
//參考型別,智能指標,使用方便
iboxActive.UpdateTime = DateTime.Now;
}
}
4.2.字典表里元素賦值不成功

/// <summary>
/// 查詢激活設備箱字典中是否有存在上報的設備箱,
/// 存在回傳true,不存在回傳false,并且新建好設備箱模型
/// </summary>
/// <param name="mac"></param>
/// <param name="iboxActive"></param>
/// <returns></returns>
public static bool GetBoxActive(UInt32 IP, out IBoxActive iboxActive)
{
if (iboxActiveDictionary.TryGetValue(IP, outiboxActive))
{
return true;
}
iboxActive = new IBoxActive();
iboxActive.IP = IP;
if (iboxActive.IP != IP)
{
LoggerHelper.Error(string.Format("實體化賦值不成功.iboxActive.IP:{0};IP{1}", iboxActive.IP, IP));
}
return false;
}
有沒有感覺很奇怪,上一句都賦值了,下一句對比就不相等,但是在多執行緒大并發里就是有這種可能,下面會詳細分析,
4.3.統計設備總數不正確

因為12008臺大并發時很容易出錯,所以改成了1000臺,如下統計資料會有出錯情況,這同樣也是因為多執行緒高并發引起的錯誤,
5.分析例外原因
5.1.造成空參考的原因
其實第4的三點原因都是同一個原因造成,所以在5.1會詳細闡述,5.2,,5.3只做簡單闡述,這里敲下黑板,分析多執行緒高并發的例外問題,程式運行的特點就是見縫就插,就像個老司機一樣,概括起來就是執行緒與執行緒之間的無序性,比如我們設備心跳執行緒正在更新設備心跳時間的時候,脫網清理執行緒就把該設備給清理掉了,如此一來,時間沒法賦值給空物件(已被脫網執行緒給清理),因此只能報空參考例外,對沒錯,就是這么簡單,耗費了我很長時間去debug跟思考這個例外,
5.2.設備IP賦值不成功原因
同樣,在創建了設備實體之后,IP賦值完成,剛好脫網清除設備執行緒運行清除了設備,當對比的時候,參考原來的地址,字典的原來地址已經存了其他設備箱的IP,所以IP地址不相等,
5.3.統計設備總數不正確原因
原因其實是5.2造成的,沒法成功注冊,當然數量就不對啦,
6.解決思路
就是當我在創建激活設備實體(第一次心跳注冊)或者更新心跳時間的時候(非第一次注冊),不要讓無序的脫網清除執行緒運行,敲黑板:就是保證心跳處理注冊程序的原子性,對,其實這里很像關系型資料庫的事務,原子性,原子性就是對抗程式無序造成例外的有力武器,我們可以在注冊心跳處理方法上加個互斥鎖,讓編譯器跟運行時去安排更加合理的執行順序,
7.代碼實作
代碼很簡單,
//定義一把鎖
public static Mutex activeIpboxDicMutex = new Mutex();
//設備箱注冊加鎖,例外全部消除
IBoxActiveDicManager.activeIpboxDicMWaitOne();
HeartHandler.Register(tcpHeartPacsessionId);
IBoxActiveDicManager.activeIpboxDicMReleaseMutex();
這里插入一下事務的使用,也是很類似的,把我們的主業務加中中間,類比方便大家理解記憶,就像夾心餅干(瞎扯),
unitOfWork.BeginTransaction();
// Adds new device
unitOfWork.DeviceRepository.Add(device);
// Commit transaction
unitOfWork.Commit();
當然也可以給設備箱脫網清除執行緒加鎖,
IBoxActiveDicManager.activeIpboxDicMutex.WaitOne();
IBoxActiveDicManager.DeleteDeadBoxFromActiveBox(_internal);
IBoxActiveDicManager.activeIpboxDicMutex.ReleaseMutex();
考慮到脫網清除執行緒會損耗部分性能,我也測驗了去掉該鎖的情況,也不會有第4的3個例外,至此問題全部解決,
8.小結
-
模擬設備數量小測不出這個問題,如此看出海量設備的重要性,因為現實情況肯定會出現以上三個問題,而且都是很嚴重很致命的問題,好的測驗方法可以把問題扼殺在搖籃中;
-
多執行緒高并發時容易出現這樣那樣的例外,要懷著敬畏之心去思考,去解決問題;
著作權宣告:本文為博主原創文章,遵循 CC 4.0 BY-SA 著作權協議,轉載請附上原文出處鏈接和本宣告,
本文鏈接:https://www.cnblogs.com/JerryMouseLi/p/12709048.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/4520.html
標籤:架構設計
