我有一個執行緒處理每 10 秒接收一次的訊息,并讓另一個執行緒每分鐘將這些訊息寫入資料庫。每條訊息都有一個不同的發件人,serialNumber在我的例子中被命名。
因此,我創建了一個如下所示的 ConcurrentDictionary。
public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;
字典的鍵是serialNumber,值是 1 分鐘訊息的集合。我想收集一分鐘資料的原因不是每 10 秒去一次資料庫,而是每分鐘去一次,這樣我可以將這個程序減少 1/6 倍。
public class ShotManager
{
private const int SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER = 25000;
private bool ACTIVE_FILE_DB_SHOOT_THREAD = false;
private List<Devices> _devices = new List<Devices>();
public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;
public ShotManager()
{
ACTIVE_FILE_DB_SHOOT_THREAD = Utility.GetAppSettings("AppConfig", "0", "ACTIVE_LIST_DB_SHOOT") == "1";
init();
}
private void init()
{
using (iotemplaridbContext dbContext = new iotemplaridbContext())
_devices = (from d in dbContext.Devices select d).ToList();
if (_dicAllPackets is null)
_dicAllPackets = new ConcurrentDictionary<string, ConcurrentQueue<PacketModel>>();
foreach (var device in _devices)
{
if(!_dicAllPackets.ContainsKey(device.SerialNumber))
_dicAllPackets.TryAdd(device.SerialNumber, new ConcurrentQueue<PacketModel> { });
}
}
public void Spinner()
{
while (ACTIVE_FILE_DB_SHOOT_THREAD)
{
try
{
Parallel.ForEach(_dicAllPackets, devicePacket =>
{
Thread.Sleep(100);
readAndShot(devicePacket);
});
Thread.Sleep(SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER);
//init();
}
catch (Exception ex)
{
//init();
tLogger.EXC("Spinner exception for write...", ex);
}
}
}
public void EnqueueObjectToQueue(string serialNumber, PacketModel model)
{
if (_dicAllPackets != null)
{
if (!_dicAllPackets.ContainsKey(serialNumber))
_dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
_dicAllPackets[serialNumber].Enqueue(model);
}
}
private void readAndShot(KeyValuePair<string, ConcurrentQueue<PacketModel>> keyValuePair)
{
StringBuilder sb = new StringBuilder();
if (keyValuePair.Value.Count() <= 0)
{
return;
}
sb.AppendLine($"INSERT INTO ......) VALUES(");
//the reason why I don't use while(TryDequeue(out ..)){..} is there's constantly enqueue to this dictionary, so the thread will be occupied with a single device for so long
for (int i = 0; i < 10; i )
{
keyValuePair.Value.TryDequeue(out PacketModel packet);
if (packet != null)
{
/*
*** do something and fill the sb...
*/
}
else
{
Console.WriteLine("No packet found! For Device: " keyValuePair.Key);
break;
}
}
insertIntoDB(sb.ToString()[..(sb.Length - 5)] ";");
}
}
EnqueueObjectToQueue來電者來自不同的班級,如下所示。
private void packetToDictionary(string serialNumber, string jsonPacket, string messageTimeStamp)
{
PacketModel model = new PacketModel {
MachineData = jsonPacket,
DataInsertedAt = messageTimeStamp
};
_shotManager.EnqueueObjectToQueue(serialNumber, model);
}
我如何呼叫上述函式來自處理函式本身。
private void messageReceiveHandler(object sender, MessageReceviedEventArgs e){
//do something...parse from e and call the func
string jsonPacket = ""; //something parsed from e
string serialNumber = ""; //something parsed from e
string message_timestamp = DateTime.Now().ToString("yyyy-MM-dd HH:mm:ss");
ThreadPool.QueueUserWorkItem(state => packetToDictionary(serialNumber, str, message_timestamp));
}
問題是有時某些資料包被錯誤地排入佇列serialNumber或重復自身(重復條目)。
ConcurrentQueue像這樣使用它聰明ConcurrentDictionary嗎?
uj5u.com熱心網友回復:
不,使用ConcurrentDictionary帶有嵌套ConcurrentQueues 的 a 作為值不是一個好主意。原子地更新這個結構是不可能的。以此為例:
if (!_dicAllPackets.ContainsKey(serialNumber))
_dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
_dicAllPackets[serialNumber].Enqueue(model);
這段小代碼充滿了競爭條件。ContainsKey運行此代碼的執行緒可以在、TryAdd、[]索引器和呼叫之間的任何點被另一個執行緒攔截Enqueue,從而改變結構的狀態,并使當前執行緒作業的正確性所依據的條件無效。
當您有一個包含不可變值的簡單物件時, AConcurrentDictionary是一個好主意,您希望同時使用它,并且在每次訪問周圍使用 a 可能會產生嚴重的爭用。您可以在此處閱讀有關此內容的更多資訊:我應該何時使用 ConcurrentDictionary 和 Dictionary?Dictionarylock
我的建議是切換到簡單的Dictionary<string, Queue<PacketModel>>,并將其與lock. 如果你小心并且在持有鎖的時候避免做任何不相關的事情,鎖會很快被釋放,很少有其他執行緒會被它阻塞。使用鎖只是為了保護結構的特定條目的讀取和更新,沒有別的。
替代設計
在ConcurrentDictionary<string, Queue<PacketModel>>您從未從字典中洗掉佇列的情況下,結構可能是一個不錯的選擇。否則仍有競爭條件發生的空間。您應該專門使用GetOrAdd方法在字典中自動獲取或添加佇列,并且queue在對其進行任何操作(讀取或寫入)之前始終將其本身用作儲物柜:
Queue<PacketModel> queue = _dicAllPackets
.GetOrAdd(serialNumber, _ => new Queue<PacketModel>());
lock (queue)
{
queue.Enqueue(model);
}
使用 aConcurrentDictionary<string, ImmutableQueue<PacketModel>>也是可能的,因為在這種情況下,值ConcurrentDictionary是不可變的,你不需要lock任何東西。您需要始終使用該AddOrUpdate方法,以便通過單個呼叫更新字典,作為原子操作。
_dicAllPackets.AddOrUpdate
(
serialNumber,
key => ImmutableQueue.Create<PacketModel>(model),
(key, queue) => queue.Enqueue(model)
);
委托內部的queue.Enqueue(model)呼叫updateValueFactory不會改變佇列。相反,它會創建一個新的ImmutableQueue<PacketModel>并丟棄前一個。不可變集合通常不是很有效。但是,如果您的目標是盡量減少執行緒之間的爭用,以增加每個執行緒必須完成的作業為代價,那么您可能會發現它們很有用。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/444730.html
