我已經從Stack Overflow 上的一個答案中成功實作了執行緒池,這幫助我加快了我的程式。它使用一個單一的在多個工人(s)之間std::queue分配作業( )。std::function<void()>std::thread
我想改進這一點。由于我只需要運行一組有限的函式,因此我計劃放棄佇列并改用變數。換句話說,n第 -th 個工人n將從std::vector<std::function<void()>>. 不幸的是,我的測驗應用程式崩潰了Segmentation fault (core dumped),到目前為止我還沒有意識到我的錯誤。
這是我的最小可重現代碼,用于計算向量中的奇數元素。(取自Scott Meyers 的想法:Cpu Caches 和 Why You Care。)
#include <algorithm>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <stdexcept> // std::invalid_argument
#include <thread>
#include <vector>
// Thread pool with a std::function for each worker.
class Pool {
public:
enum class Status {
idle,
working,
terminate
};
const int worker_count;
std::vector<Status> statuses;
std::vector<std::mutex> mutexes;
std::vector<std::condition_variable> conditions;
std::vector<std::thread> threads;
std::vector<std::function<void()>> jobs;
void thread_loop(int thread_id)
{
std::puts("Thread started");
auto &my_status = statuses[thread_id];
auto &my_mutex = mutexes[thread_id];
auto &my_condition = conditions[thread_id];
auto &my_job = jobs[thread_id];
while (true) {
std::unique_lock<std::mutex> lock(my_mutex);
my_condition.wait(lock, [this, &my_status] { return my_status != Status::idle; });
if (my_status == Status::terminate)
return;
my_job();
my_status = Status::idle;
lock.unlock();
my_condition.notify_one(); // Tell the main thread we are done
}
}
public:
Pool(int size) : worker_count(size), statuses(size, Status::idle), mutexes(size), conditions(size), threads(), jobs(size)
{
if (size < 0)
throw std::invalid_argument("Worker count needs to be a positive integer");
};
~Pool()
{
for (int i = 0; i < worker_count; i) {
std::unique_lock lock(mutexes[i]);
statuses[i] = Status::terminate;
lock.unlock(); // Unlock before notifying
conditions[i].notify_one();
}
for (auto &thread : threads)
thread.join();
threads.clear();
};
void start_threads()
{
threads.resize(worker_count);
jobs.resize(worker_count);
for (int i = 0; i < worker_count; i) {
statuses[i] = Status::idle;
jobs[i] = []() { std::puts("I am running"); };
threads[i] = std::thread(&Pool::thread_loop, this, i);
}
}
void set_and_start_job(const std::function<void(int)> &job)
{
for (int i = 0; i < worker_count; i) {
std::unique_lock lock(mutexes[i]);
jobs[i] = [&job, i]() { job(i); };
statuses[i] = Status::working;
lock.unlock();
conditions[i].notify_one();
}
}
void wait()
{
for (int i = 0; i < worker_count; i) {
auto &my_status = statuses[i];
std::unique_lock lock(mutexes[i]);
conditions[i].wait(lock, [this, &my_status] { return my_status != Status::working; });
}
}
};
int main()
{
constexpr int worker_count = 1;
constexpr int vector_size = 1 << 10;
std::vector<int> test_vector;
test_vector.reserve(vector_size);
for (int i = 0; i < vector_size; i)
test_vector.push_back(i);
std::vector<int> worker_odd_counts(worker_count, 0);
const auto worker_task = [&](int thread_id) {
int chunk_size = vector_size / (worker_count) 1;
int my_start = thread_id * chunk_size;
int my_end = std::min(my_start chunk_size, vector_size);
int local_odd_count = 0;
for (int ii = my_start; ii < my_end; ii)
if (test_vector[ii] % 2 != 0)
local_odd_count;
worker_odd_counts[thread_id] = local_odd_count;
};
Pool pool = Pool(worker_count);
pool.start_threads();
pool.set_and_start_job(worker_task);
pool.wait();
int odd_count = 0;
for (auto elem : worker_odd_counts)
odd_count = elem;
std::cout << odd_count << '\n';
}
uj5u.com熱心網友回復:
TL;DR 版本:
最簡單的解決方法是改變
jobs[i] = [&job, i]() { job(i); };
至
jobs[i] = [job, i]() { job(i); };
這按價值捕獲job并制作副本。副本不會在 lambda 之前超出范圍,并且 lambda 將超過執行緒。
長版:
問題在
jobs[i] = [&job, i]() { job(i); };
在set_and_start_job. 物件支持job在執行緒啟動之前超出范圍,但是如果
pool.set_and_start_job(worker_task);
并且worker_task在執行緒加入之前不會超出范圍?
原來那是因為set_and_start_job需要 aconst std::function<void(int)> &而worker_task不是a std::function,只是隱式轉換為 a std::function。這種轉換會生成一個臨時變數,其壽命系結到set_and_start_job'sjob引數。set_and_start_job退出時,超出job范圍,臨時物件被銷毀。
上面是簡單的修復方法,但我們也可以在源代碼中直接轉換為 `std::function 一直通過系統傳遞,并且在執行緒加入后將超出范圍。
const std::function<void(int)> worker_task = [&](int thread_id) { ... };
端到端std::function和捕獲參考可能會節省一些資源,但我在參考和執行緒方面的經驗并不是最好的,所以我更喜歡副本以減少我錯過一些微妙之處的可能性或者將來有人會做出改變,增加一些。
uj5u.com熱心網友回復:
在函式Pool::set_and_start_job中,設定作業時,&從job捕獲中洗掉似乎已經解決了問題:
jobs[i] = [job, i]() { job(i); };
但是,我只是有懷疑,不知道根本原因。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/523277.html
標籤:C 多线程标准线程
下一篇:Julia:向量的并行函式
