我的使用者(從 運行main)支持背景關系取消和通過case陳述句從通道讀取。我可以使用背景關系關閉消費者,這很好用。但是,當我在一個 case 陳述句中生成多個作業人員時,每個作業人員都會從 獲得相同的作業(訊息)jobsChan,這不是我想要的:
func (app *App) consumer() {
for {
select {
case <-app.ctx.Done():
app.infoLog.Print("Caught SIGINT, stopping.")
app.wg.Wait()
app.doneChan <- struct{}{} # main uses this channel to block itself until all goroutines are stopped
app.infoLog.Print("Shutting down the consumer...")
return
case job := <-app.jobsChan:
// PROBLEM here: wrong, each worker is given the same job
for workerNumber := 0; workerNumber < app.config.workers; workerNumber {
app.wg.Add(1)
go app.workerFunc(workerNumber, job)
}
}
}
}
func (app *App) workerFunc(id int, job Job) {
defer app.wg.Done()
... actual worker code here ...
}
我怎么可以重寫這段代碼,這樣我可以保持select的app.ctx.Done通道,并在同一時間可以產卵的工人,使每個工人從通道的作業挑選下一個訊息?我需要繼續for/select監聽ctx取消,但同時我需要生成 X 個工人從jobsChan消費者中讀取訊息。這可能嗎?
想到的唯一選擇是將 channel 直接傳遞到 spawnedworkerFunc并for job := range app.jobsChan在workerFunc. 但是隨后case job := <-app.jobsChan:消費者中的整體變得毫無意義,我不知道如何重寫它。
澄清一下:當我運行應用程式時,我希望每個工人都有一個新的作業 id 從jobsChan- 但他們都處理相同的,例如 1,然后他們都處理下一個,例如 2
#wrong
Worker 0: start processing item 1
Worker 2: start processing item 1
Worker 1: start processing item 1
uj5u.com熱心網友回復:
您現有的代碼將相同的作業明確分配給所有作業人員。如果您有固定數量的作業人員,請為他們創建 goroutines(在初始化期間),并讓他們收聽頻道:
for workerNumber:0;workerNumber<app.config.workers;workerNumber {
go app.workerFunc(ctx,workerNumber,app.jobsChan)
}
在每個作業執行緒中,只需檢查 jobQueue 和背景關系取消。
換句話說,您不需要consumer, 將作業直接傳遞給工人。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/343544.html
標籤:走
下一篇:在立即運行服務器之前釋放埠
