這是我的ThreadPool實作。我已經在簡單的函式中嘗試過它main并且無法正確停止它,在執行緒啟動之前呼叫解構式并且整個程式在死鎖(on)中完成,因為在執行緒到達函式t.join()之前已經呼叫了條件變數。wait
任何想法如何解決它?還是有更好的方法來實作它?
ThreadPool.cpp
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
namespace Concurrency {
template <typename RetType>
class StandardThreadPool : public ThreadPool<RetType> {
private:
typedef std::function<RetType()> taskType;
ThreadSafeQueue<std::packaged_task<RetType()>> queue;
std::mutex queueMutex;
std::condition_variable queueCondition;
std::vector<std::thread> poolThreads;
std::atomic<bool> stopThreadsFlag{false};
void threadWork() {
std::cout << "thread:" << std::this_thread::get_id() << " started\n";
std::unique_lock<std::mutex> lock(queueMutex);
while (true) {
queueCondition.wait(lock);
if (stopThreadsFlag.load())
break;
auto task = queue.Pop();
if (task)
(*task)();
}
std::cout << "thread:" << std::this_thread::get_id() << " finished\n";
}
void initThreadPool() {
poolThreads.resize(StandardThreadPool<RetType>::maxThreads);
}
void startThreads() {
for (int i = 0; i < StandardThreadPool<RetType>::maxThreads; i ) {
poolThreads[i] =
std::thread(&StandardThreadPool<RetType>::threadWork, this);
}
}
void terminateThreads() {
stopThreadsFlag.store(true);
queueCondition.notify_all();
for (auto &t : poolThreads) {
t.join();
}
}
public:
StandardThreadPool(int maxThreads) : ThreadPool<RetType>(maxThreads) {
initThreadPool();
startThreads();
}
std::future<RetType> virtual Push(taskType &&task) override {
std::packaged_task<RetType()> pt = std::packaged_task<RetType()>(task);
auto future = pt.get_future();
queue.Push(std::move(pt));
queueCondition.notify_one();
return future;
}
~StandardThreadPool<RetType>() {
std::cout << "destructor called\n";
terminateThreads(); }
};
} // namespace Concurrency
namespace Concurrency {
template <typename T> class ThreadSafeQueue {
private:
struct node {
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex headMutex;
std::mutex tailMutex;
std::unique_ptr<node> head;
node *tail;
node *getTail() {
std::lock_guard<std::mutex> lock(tailMutex);
return tail;
}
std::unique_ptr<node> popHead() {
std::lock_guard<std::mutex> lock(headMutex);
if (head.get() == getTail())
return nullptr;
std::unique_ptr<node> oldHead(std::move(head));
head = std::move(oldHead->next);
return oldHead;
}
public:
ThreadSafeQueue() : head(new node), tail(head.get()) {}
std::shared_ptr<T> Pop() {
auto oldHead = popHead();
return oldHead ? oldHead->data : nullptr;
}
void Push(T &newValue) {
auto newData = std::make_shared<T>(std::forward<T>(newValue));
std::unique_ptr<node> pNew(new node);
auto newTail = pNew.get();
std::lock_guard<std::mutex> lock(tailMutex);
tail->data = newData;
tail->next = std::move(pNew);
tail = newTail;
}
void Push(T &&newValue) {
auto newData = std::make_shared<T>(std::move(newValue));
std::unique_ptr<node> pNew(new node);
auto newTail = pNew.get();
std::lock_guard<std::mutex> lock(tailMutex);
tail->data = newData;
tail->next = std::move(pNew);
tail = newTail;
}
ThreadSafeQueue(const ThreadSafeQueue &) = delete;
ThreadSafeQueue &operator=(const ThreadSafeQueue &) = delete;
};
} // namespace Concurrency
#include <functional>
#include <future>
namespace Concurrency {
template <typename RetType> class ThreadPool {
public:
int maxThreads;
public:
typedef std::function<RetType()> taskType;
ThreadPool(int maxThreads):maxThreads(maxThreads){}
virtual std::future<RetType> Push(taskType &&newTask) = 0;
ThreadPool(const ThreadPool &) = delete;
ThreadPool(const ThreadPool &&) = delete;
};
} // namespace Concurrency
main.cpp
int main() {
Concurrency::StandardThreadPool<int> th(1);
auto fun = []() {
std::cout << "function running\n";
return 2;
};
th.Push(fun);
return EXIT_SUCCESS;
}
uj5u.com熱心網友回復:
首先,一個正確的執行緒安全佇列。
template<class T>
struct threadsafe_queue {
[[nodiscard]] std::optional<T> pop() {
auto l = lock();
cv.wait(l, [&]{ return is_aborted() || !data.empty(); });
if (is_aborted())
return {};
auto r = std::move(data.front());
data.pop_front();
cv.notify_all(); // for wait_until_empty
return r; // might need std::move here, depending on compiler version
}
bool push(T t) {
auto l = lock();
if (is_aborted()) return false;
data.push_back(std::move(t));
cv.notify_one();
return true;
}
void set_abort_flag() {
auto l = lock(); // still need this
aborted = true;
data.clear();
cv.notify_all();
}
[[nodiscard]] bool is_aborted() const { return aborted; }
void wait_until_empty() {
auto l = lock();
cv.wait(l, [&]{ return data.empty(); });
}
private:
std::unique_lock<std::mutex> lock() {
return std::unique_lock<std::mutex>(m);
}
std::condition_variable cv;
std::mutex m;
std::atomic<bool> aborted{false};
std::deque<T> data;
};
這在內部處理中止等。
我們的執行緒池就變成了:
struct threadpool {
explicit threadpool(std::size_t count)
{
for (std::size_t i = 0; i < count; i) {
threads.emplace_back([&]{
// abort handled by empty pop:
while( auto f = queue.pop() ) {
(*f)();
}
});
}
}
void set_abort_flag() {
queue.set_abort_flag();
}
[[nodiscard]] bool is_aborted() const {
return queue.is_aborted();
}
~threadpool() {
queue.wait_until_empty();
queue.set_abort_flag(); // get threads to leave the queue
for (std::thread& t:threads)
t.join();
}
template<class F,
class R=typename std::result_of<F()>::type
>
std::future<R> push_task( F f ) {
std::packaged_task<R()> task( std::move(f) );
auto ret = task.get_future();
if (queue.push( std::packaged_task<void()>(std::move(task)) )) // wait, this works? Yes it does.
return ret;
else
return {}; // cannot push, already aborted
}
private:
// yes, void. This is evil but it works
threadsafe_queue<std::packaged_task<void()>> queue;
std::vector<std::thread> threads;
};
在c 11你可以交換std::optionalfor std::unique_ptr。更多的運行時開銷。
這里的訣竅是 astd::packaged_task<void()>可以存盤 a std::packaged_task<R()>。而且我們不需要佇列中的回傳值。因此,一個執行緒池可以處理任務中任意數量的不同回傳值——它不在乎。
我只加入thread_pool銷毀執行緒。我也可以在中止后這樣做。
銷毀 athread_pool等待所有任務完成。請注意,中止 thread_pool 可能不會中止正在進行的任務。您可能想要添加的一件事是將中止 API/標志傳遞給任務的選項,因此如果被詢問,它們可以提前中止。
獲得這種工業規模很難,因為理想情況下,任務中的所有阻塞也會關注中止的可能性。
Live example.
You could add a 2nd cv to notify after pops, which only wait_until_empty waits on. That might safe you some spurious wakeups.
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/447419.html
