我正在用 C 語言實作我自己的通用執行緒池演算法,使用斐波那契數列進行測驗,最近幾天我一直被一個完全困擾我的問題所困擾。
執行程式時,它會一直作業,直到某個時刻它突然停止,這對我來說很明顯。
我注意到的一件事是執行在一小段時間后停止,因為如果向其中添加列印命令或睡眠命令,它會在執行程序中提前停止。
編輯:錯過了這部分,我已經測驗了死鎖,但沒有,它似乎只是在某個時候沒有將任何新東西推入堆疊,導致所有執行緒只是試圖從堆疊中拉出,識別出它是空的并跳回只是為了無限重復這個程序。
這是代碼:
執行緒池.h
#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED
#include <stddef.h>
#include <stdbool.h>
typedef void (*ThreadTask_f)(void*);
typedef struct Future {
ThreadTask_f fn; //Pointer to the to be executed function
bool fulfilled;
} Future;
extern int tpInit(size_t size);
extern void tpRelease(void);
extern void tpAsync(Future *future);
extern void tpAwait(Future *future);
/* creates an abstraction for easy interaction of functions with the threadpool
* TYPE: type that the function returns
* NAME: name of the function to be parralelised
* ARG: type of the argument of the function given
*/
#define TASK(TYPE, NAME, ARG) \
TYPE NAME(ARG); \
\
typedef struct { \
Future fut; \
ARG arg; \
TYPE res; \
} NAME ## _fut; \
\
static void NAME ## Thunk(void *args) { \
NAME ## _fut *data = args; \
data->res = NAME(data->arg); \
} \
static inline NAME ## _fut NAME ## Future(ARG arg) { \
return (NAME ## _fut) { \
.fut = { .fn = &NAME ## Thunk, .fulfilled = false }, \
.arg = arg \
}; \
} \
static inline NAME ## _fut* NAME ## Async(NAME ## _fut *future) { \
tpAsync(&future->fut); \
return future; \
} \
static inline TYPE NAME ## Await(NAME ## _fut *future) { \
tpAwait(&future->fut); \
return future->res; \
}
#endif
執行緒池
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <time.h>
#define THREADSTACKSIZE 8388608
#define INITSTACKSIZE 1024 //initial value for how many Tasks can be in the taskstack
#define STACKMEMMULT 2 //if the TaskStack is full, multiply by this
typedef struct TaskStack {
Future **start;
size_t size;
long current;
} TaskStack;
typedef struct ThreadPool {
size_t size;
pthread_t *threads;
TaskStack *stack;
} ThreadPool;
static pthread_mutex_t stackAccess;
static ThreadPool *tp;
void nsleep(unsigned long nano) {
struct timespec delay = {
.tv_sec = 0,
.tv_nsec = nano
};
nanosleep(&delay, NULL);
}
static void push(Future *future){
pthread_mutex_lock(&stackAccess);
if(tp->stack->current ==tp->stack->size){
tp->stack->size*=2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
pthread_mutex_unlock(&stackAccess);
}
static Future *pull(){
Future *retVal=NULL;
PULLBEGIN:
pthread_mutex_lock(&stackAccess);
if(tp->stack->current==-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
pthread_mutex_unlock(&stackAccess);
pthread_testcancel();
sched_yield();
goto PULLBEGIN;
}
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
pthread_mutex_unlock(&stackAccess);
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
fut=pull();
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
return NULL;
}
int tpInit(size_t size) {
int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE;
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i ){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20 i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%i\n", err);
return -1;
}
void tpRelease(void) {
for(int i=0; i<tp->size; i ){
pthread_cancel(tp->threads[i]);
pthread_join(tp->threads[i], NULL);
}
free(tp->stack->start);
free(tp->stack);
free(tp->threads);
free(tp);
}
void tpAsync(Future *future) {
future->fulfilled=false;
push(future);
return;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut=pull();
workFut->fn(workFut);
workFut->fulfilled=true;
}
}
主檔案
#include "threadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
static TASK(long, fib, long);
long fib(long n) {
if (n <= 1){
return n;
}
fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) });
fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) });
return fibAwait(a) fibAwait(b);
}
int main() {
if (tpInit(8) != 0)
perror("Thread Pool initialization failed"), exit(-1);
atexit(&tpRelease);
for (long i = 0; i <= 100; i)
printf("fib(%2li) = %li\n", i, fib(i));
return 0;
}
生成檔案
#!/usr/bin/make
.SUFFIXES:
.PHONY: all run pack clean
SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
TAR = threadpool
CFLAGS = -std=gnu11 -c -g -Os -Wall -MMD -MP
LFLAGS = -pthread
DEP = $(OBJ:%.o=%.d)
-include $(DEP)
%.o: %.c
$(CC) $(CFLAGS) $< -o $@
$(TAR): $(filter-out quicksort.o,$(OBJ))
$(CC) $(LFLAGS) -o $@ $^
all: $(TAR)
run: all
./$(TAR)
clean:
$(RM) $(RMFILES) $(OBJ) $(TAR) bench $(DEP) $(PCK)
I really hope you have some idea what is happening. Thank you in advance.
uj5u.com熱心網友回復:
所以我想通了,在 Craig Estey 和 Amit 的慷慨幫助下(你可以在原始帖子下的評論中看到)。
所以最后它是一個死鎖,因為你仍然可以在我不會修改的原始帖子中看到,所以任何感興趣的人都有機會看到我的愚蠢。
發生這種情況是因為在某一時刻將有 6 個執行緒等待拉取,堆疊為空,剩下的兩個執行緒一個進入等待,另一個剛剛完成了給定的函式,這是一個沒有呼叫另一個的遞回地(在我們的示例中為 fib(0) 或 fib(1))。現在執行緒已經完成了,讓我們稱它為執行緒 7,進入 fib_await() 將檢查它正在等待的值是否已滿足,此時尚未滿足,因此它檢查是否還有其他在堆疊中。因為沒有所以被困在等待中。
現在另一個執行緒,執行緒 8,剛剛完成它給定函式的執行緒將它的未來標記為已完成,并試圖拉出另一個未來。由于它是空的,它也將保持拉力。
現在所有執行緒都卡在 pull 中,沒有一個執行緒可以繼續前進,因為等待另一個執行緒的執行緒首先必須離開 pull()。
我唯一的修改是針對 pull()、push()、tpAwait()、tpInit() 和 workerThread(),因為我還實作了一個非常簡單的工單鎖。
??執行緒池.c
static void ticketLockInit(){
atomic_init(&nowServing, 0);
atomic_init(&nextTicket, 0);
}
static inline void ticketLockAcquire(){
atomic_long myTicket=atomic_fetch_add(&nextTicket,1);
while(myTicket!=nowServing){
nsleep(1);
}
}
static inline void ticketLockRelease(){
nowServing;
}
static void push(Future *future){
ticketLockAcquire();
if( tp->stack->current==tp->stack->size){
fprintf(stderr, "MemRealloc\n");
tp->stack->size=tp->stack->size*2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
ticketLockRelease();
}
static Future *pull(){
Future *retVal=NULL;
ticketLockAcquire();
if(tp->stack->current>-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
}
ticketLockRelease();
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
if((fut=pull())!=NULL){
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
}
return NULL;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut;
if((workFut=pull())!=NULL){
workFut->fn(workFut);
workFut->fulfilled=true;
pthread_testcancel();
}
}
}
int tpInit(size_t size) {
int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
ticketLockInit();
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE;
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i ){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20 i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%i\n", err);
return -1;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/399898.html
標籤:c multithreading pthreads threadpool
