我有一個命令佇列。其中一些具有相同的屬性,例如“檔案 ID”。我需要能夠并行處理它們,但有一個限制:具有相同功能的命令應該按照它們在佇列中出現的順序進行處理。
例如:我的佇列是[n, a, s, j, a, l, v, g, a, f, f],其中字母是 DocumentIds。我需要并行處理,但是 'a's 的處理應該按照它們出現在佇列中的順序,即[1, 4, 8],其中數字是佇列中字母的 ID。因此,處理這些元素的順序無關緊要,只要[8]在[4]之后,也就是[1]之后(它們之間有任意數量的中間項)。
首先,我嘗試對 DocumentId 進行 SemaphoreSlim 鎖定。這意味著,如果我們要處理一個專案,如果其他執行緒應該處理相同的專案,我們就會阻塞它們。這不起作用,因為 SemaphoreSlim 不保證解鎖的 FIFO 順序。
然后,我在 SemaphoreSlim 周圍做了一個包裝器,以強制 FIFO 解除阻塞:
public class FifoAsyncLock : IDisposable
{
private readonly SemaphoreSlim _sem = new (1, 1);
private readonly ConcurrentQueue<TaskCompletionSource> _queue = new ();
public async Task WaitAsync()
{
var tcsE = new TaskCompletionSource();
_queue.Enqueue(tcsE);
await _sem.WaitAsync();
if (_queue.TryDequeue(out var tcsD))
tcsD.SetResult();
await tcsE.Task;
}
public void Release()
{
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
}
}
我在一個類中使用它,我為每個 DocumentId 存盤了一個信號量,并且還記錄了有多少鎖定用戶正在等待解鎖。如果最后一個用戶釋放鎖,它會被洗掉(因為記憶體):
public class DocIdLocker : IDisposable
{
private readonly ConcurrentDictionary<Guid, FifoAsyncLock> _docIdLocks = new ();
private readonly ConcurrentDictionary<Guid, int> _users = new ();
private bool _disposed;
public async Task<IAsyncDisposable> AquireLockAsync(Guid docId)
{
var userCount = _users.AddOrUpdate(docId, 1, (_, o) => o 1);
await _docIdLocks.GetOrAdd(docId, new FifoAsyncLock()).WaitAsync();
return new Lock(this, docId);
}
private async Task Release(Guid docId)
{
if (!_docIdLocks.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
_docIdLocks[docId].Release();
if (!_users.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
if (--_users[docId] == 0)
{
_docIdLocks.TryRemove(docId, out _);
_users.TryRemove(docId, out _);
}
}
private class Lock : IAsyncDisposable
{
private readonly DocIdLocker _parent;
private readonly Guid _docId;
public Lock(DocIdLocker parent, Guid docId)
{
_parent = parent;
_docId = docId;
}
public ValueTask DisposeAsync() => new (_parent.Release(_docId));
}
public void Dispose()
{
if (_disposed)
return;
foreach (var item in _docIdLocks.Values)
item.Dispose();
_users.Clear();
_disposed = true;
}
}
但是我的測驗仍然表明沒有保留'a'的順序。
我想,也許有些執行緒在鎖仍然不存在時獲取元素,并無序處理它們。好吧,這一切都變得非常難以推理,現在一切都在我腦海中混雜在一起。
有沒有一種簡單而優雅的方式來實作我想要實作的目標?
uj5u.com熱心網友回復:
解決此問題的最簡單方法可能是按相關屬性對命令進行分組,然后并行處理組而不是單個檔案。然后對每個組執行一個順序foreach回圈,并逐個處理相關檔案。例子:
string[] documents = new[] { 'n', 'a', 's', 'j', 'a', 'l', 'v', 'g', 'a', 'f', 'f' }
.Select((item, index) => $"{item}-{index}")
.ToArray();
Console.WriteLine($"Documents: [{String.Join(", ", documents)}]");
var grouped = documents.GroupBy(item => item[0]); // Group by the first char
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(grouped, options, grouping =>
{
foreach (var document in grouping)
{
Console.WriteLine($"Processing: {document}");
Thread.Sleep(500); // Simulate a CPU-bound or blocking operation
}
});
輸出:
Documents: [n-0, a-1, s-2, j-3, a-4, l-5, v-6, g-7, a-8, f-9, f-10]
Processing: a-1
Processing: n-0
Processing: s-2
Processing: j-3
Processing: a-4
Processing: l-5
Processing: v-6
Processing: g-7
Processing: f-9
Processing: a-8
Processing: f-10
現場演示。
GroupByLINQ 運算子的排序行為是明確定義的。根據檔案:
物件的
IGrouping<TKey,TElement>生成順序基于 source 中生成 each 的第一個鍵的元素的順序IGrouping<TKey,TElement>。分組中的元素按照產生它們的元素出現在源代碼中的順序產生。
這種方法有一些缺點:
- 在開始并行處理之前,必須完全列舉源序列。如果源序列是延遲可列舉的,例如
BlockingCollection<T>包含來自并行生產者的實時專案,這可能是一個問題。 - 處理順序由第一個唯一鍵出現在源序列中的順序決定,而不是由專案本身的順序決定。所以例如如果源是(A, B, A, A, A, A, A) 并且并行度是1,那么B項將被最后處理。
- 由此產生的磁區方案可能不會很好地平衡。如果存在具有大量元素的鍵,并且這些鍵在源序列中出現較晚,則并行處理可能會在操作結束時降低并行化程度。為了緩解此問題,最好根據組包含的專案數以降序對組重新排序。
下面是一個自定義 LINQ 運算子ToConsumableGroupings,它可能比標準運算子更適合這種情況GroupBy。它解決了大多數前面提到的問題,因為它會延遲列舉源序列,并在運行時發出分組。它與操作員具有相同的簽名GroupBy:
/// <summary>
/// Groups the elements of a sequence into consumable groupings, according to
/// a specified key selector function.
/// </summary>
/// <remarks>
/// For each key, more than one groupings can be emitted. A new grouping can be emitted
/// if the previously emitted grouping for the same key has been fully consumed.
/// </remarks>
public static IEnumerable<IGrouping<TKey, TSource>>
ToConsumableGroupings<TKey, TSource>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = default)
{
var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
foreach (var item in source)
{
var key = keySelector(item);
lock (perKey)
{
if (perKey.TryGetValue(key, out var queue))
{
queue.Enqueue(item); continue;
}
queue = perKey[key] = new Queue<TSource>();
queue.Enqueue(item);
}
yield return new Grouping<TKey, TSource>(key, GetGroup(key));
}
IEnumerable<TSource> GetGroup(TKey key)
{
while (true)
{
TSource item;
lock (perKey)
{
var queue = perKey[key];
if (queue.Count == 0) { perKey.Remove(key); break; }
item = queue.Dequeue();
}
yield return item;
}
}
}
private class Grouping<TKey, TSource> : IGrouping<TKey, TSource>
{
private readonly TKey _key;
private readonly IEnumerable<TSource> _sequence;
public Grouping(TKey key, IEnumerable<TSource> sequence)
{
_key = key;
_sequence = sequence;
}
public TKey Key => _key;
public IEnumerator<TSource> GetEnumerator() => _sequence.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
使用示例:
var grouped = documents.ToConsumableGroupings(item => item[0]);
與GroupBy運算子不同,運算ToConsumableGroupings符發出非物化分組,預計只會列舉(使用)一次。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/482952.html
