我正在嘗試通過std::atomic<std::shared_ptr>>對容器等非平凡物件進行操作來實作無鎖包裝器。我在這兩個主題中找到了一些相關的資訊:
- 記憶柵欄
- 原子使用
但這仍然不是我需要的。
舉個例子:
TEST_METHOD(FechAdd)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
container ;
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
container ;
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
此函式作業正常,因為后增量運算子使用內部fetch_add()原子操作。
另一方面:
TEST_METHOD(LoadStore)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
{
auto value = container.load();
value ;
container.store(value);
}
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
{
auto value = container.load();
value ;
container.store(value);
}
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
而如果我用.load()和.store()操作和這兩個操作之間的增量替換它,結果就不一樣了。這是兩個原子操作,因此無法在這些操作之間進行同步。
我的最終目標是通過std::atomic<std::shared_ptr>加載物件的實際狀態,執行一些非常量操作,并再次通過存盤操作保存它。
TEST_METHOD(AtomicSharedPtr)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->emplace(5);
container.store(reader);
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->erase(5);
container.store(reader);
}
});
}
I knew that the second thread also has only shared_ptr from atomic and non-const operations on shared_ptr, which can only cause data race.
So any hint on how to implement a lock-free wrapper that will work with non-const operations of the object stored in std::atomic<std::shared_ptr>?
uj5u.com熱心網友回復:
首先,旁注。 std::atomic<std::shared_ptr<T>>提供對指標的原子訪問,并且不為提供任何同步T。在這里要注意這一點非常重要。并且您的代碼顯示您正在嘗試同步T,而不是指標,因此 沒有按照atomic您的想法進行操作。為了使用std::atomic<std::shared_ptr<T>>,您必須將指向的物件T視為const。
有兩種方法可以以執行緒安全的方式處理任意資料的讀取-修改-寫入。第一個顯然是使用鎖。這通常執行起來更快,并且由于它的簡單性,通常錯誤更少,因此強烈建議。如果你真的想用原子操作來做到這一點,這很困難,而且執行速度較慢。
它通常看起來像這樣,您對指向的資料進行深度復制,對副本進行變異,然后嘗試用新資料替換舊資料。如果其他人在此期間更改了資料,則您將其全部丟棄并重新開始整個突變。
template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
do {
const auto&& oldT = container.load();
//first a deep copy, to enforce immutability
auto&& newT = std::make_shared(oldT.get());
//then mutate the T
if (!function(*newT))
return false; //function aborted
//then attempt to save the modified T.
//if someone else changed the container during our modification, start over
} while(container.compare_exchange_strong(oldT, newT) == false);
//Note that this may take MANY tries to eventually succeed.
return true;
}
然后用法類似于您所擁有的:
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
readModifyWrite(container, [](auto& reader) {
reader.emplace(5);
return true;
});
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
readModifyWrite(container, [](auto& reader) {
reader.erase(5);
return true;
});
}
});
}
請注意,由于一個執行緒是插入5 loopCount次數,另一個是擦除5 loopCount次數,但它們之間不同步,因此第一個執行緒可能會連續寫入多次(對于集合而言是空操作),然后是第二個執行緒可能會連續擦除多次(這是一組無操作),因此您無法真正保證此處的最終結果,但我假設您知道這一點。
但是,如果您想使用突變進行同步,那會變得相當復雜。如果成功或中止,變異函式必須回傳,然后呼叫者readModifyWrite必須處理修改中止的情況。(注意,readModifyWrite有效地從函式回傳值,所以它從修改步驟回傳值。寫步驟不影響回傳值)
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_emplace = readModifyWrite(container, [](auto& reader) {
return reader.emplace(5);
});
if (did_emplace) i ;
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_erase = readModifyWrite(container, [](auto& reader) {
return reader.erase(5);
});
if (did_erase) i ;
}
});
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/391057.html
標籤:c multithreading c 20 stdatomic
上一篇:Parallel.ForEach:Break和ParallelLoopState.LowestBreakIteration。該怎么辦?
