我剛剛進入 Go 中的并發性,并嘗試創建一個調度 go 例程,該例程會將作業發送到偵聽 jobchan 通道的作業池。如果訊息通過 dispatchchan 通道進入我的調度函式并且我的其他 go 例程正忙,則訊息將附加到調度器中的堆疊切片上,調度器將在稍后作業人員可用時再次嘗試發送,和/或沒有在 dispatchchan 上收到更多訊息。這是因為 dispatchchan 和 jobchan 沒有緩沖,并且作業人員正在運行的 go 例程會將其他訊息附加到調度程式直到某個點,我不希望作業人員阻塞等待調度程式并造成死鎖。這是到目前為止我想出的調度程式代碼:
func dispatch() {
var stack []string
acount := 0
for {
select {
case d := <-dispatchchan:
stack = append(stack, d)
case c := <-mw:
acount = acount c
case jobchan <-stack[0]:
if len(stack) > 1 {
stack[0] = stack[len(stack)-1]
stack = stack[:len(stack)-1]
} else {
stack = nil
}
default:
if acount == 0 && len(stack) == 0 {
close(jobchan)
close(dispatchchan)
close(mw)
wg.Done()
return
}
}
}
完整示例在https://play.golang.wiki/p/X6kXVNUn5N7
mw 通道是一個緩沖通道,其長度與作業程式 go 例程的數量相同。它充當作業池的信號量。如果作業程式正在執行 [m]eaningful [w]ork,它會在 mw 通道上拋出 int 1,當它完成作業并回傳 for 回圈監聽 jobchan 時,它會在 mw 上拋出 int -1。通過這種方式,調度程式知道作業池是否正在完成任何作業,或者池是否空閑。如果池空閑并且堆疊上沒有更多訊息,則調度程式關閉通道并將控制權回傳給主函式。
這一切都很好,但我遇到的問題是堆疊本身的長度可能為零,因此在我嘗試將堆疊 [0] 發送到 jobchan 的情況下,如果堆疊為空,則會出現越界錯誤。我想弄清楚的是如何確保當我遇到這種情況時, stack[0] 中是否有值。我不希望這種情況向 jobchan 發送一個空字串。
任何幫助是極大的贊賞。如果有我應該考慮的更典型的并發模式,我很想聽聽。我不是 100% 出售此解決方案,但這是迄今為止我得到的最遠的解決方案。
uj5u.com熱心網友回復:
這一切都很好,但我遇到的問題是堆疊本身的長度可能為零,因此在我嘗試將堆疊 [0] 發送到 jobchan 的情況下,如果堆疊為空,則會出現越界錯誤。
我無法用你的操場鏈接復制它,但它是可信的,因為至少有一個gofunc作業人員可能已經準備好在那個頻道上接收。
我一直輸出Msgcnt: 0,這也很容易解釋,因為gofunc威力不是已經準備好接收上jobschan時dispatch()運行它select。這些操作的順序沒有定義。
嘗試創建一個 dispatch go 例程,將作業發送到監聽 jobchan 通道的作業池
通道不需要調度器。通道是調度器。
如果訊息通過 dispatchchan 通道進入我的調度函式并且我的其他 go 例程正忙,則訊息 [...] 將 [...] 稍后在作業人員可用時再次發送,[...] 或不發送在 dispatchchan 上收到更多訊息。
通過一些創造性的編輯,很容易把它變成接近緩沖通道定義的東西。它可以立即讀取,或者它可能需要一些“limit不能被立即派遣”的訊息。您確實定義了limit,盡管它沒有在您的代碼中的其他地方使用。
在任何函式中,定義一個你沒有讀過的變數都會導致編譯時錯誤,比如limit declared but not used. 這種限制提高了代碼質量并有助于識別型別。但是在包范圍內,您已經將未使用的定義limit為“全域”,從而避免了一個有用的錯誤——您沒有限制任何東西。
不要使用全域變數。使用傳遞的引數來定義作用域,因為作用域的定義等同于用go關鍵字表示的函式式并發。 將本地范圍內定義的相關通道傳遞給包范圍內定義的函式,以便您可以輕松跟蹤它們的關系。并使用定向渠道來強制您的功能之間的生產者/消費者關系。稍后會詳細介紹。
回到“限制”,限制您排隊的作業數量是有意義的,因為所有資源都是有限的,并且接受比您預期的處理更多的訊息需要比行程記憶體提供的更持久的存盤。如果你不覺得有義務滿足這些要求,不管是什么,不接受擺在首位“太多”他們。
那么,什么函式有dispatchchan和dispatch()?在處理之前存盤有限數量的待處理請求(如果有),然后將它們發送給下一個可用的作業人員?這正是緩沖通道的用途。
回圈邏輯
誰“知道”您的程式何時完成? main()提供初始輸入,但您在 `dispatch() 中關閉所有 3 個通道:
close(jobchan)
close(dispatchchan)
close(mw)
您的作業人員寫入他們自己的作業佇列,因此只有在作業人員完成寫入后才能關閉傳入的作業佇列。但是,個別作業人員也不知道何時關閉作業佇列,因為其他作業人員正在向其中寫入。 沒有人知道你的演算法什么時候完成。這是你的回圈邏輯。
mw 通道是一個緩沖通道,其長度與作業程式 go 例程的數量相同。它充當作業池的信號量。
這里有一個競爭條件。考慮所有n工人剛剛收到最后n一份作業的情況。他們各自讀取jobschan并檢查 的值ok。 disptatcher繼續運行其select. 現在沒有人在寫信dispatchchan或讀信,jobschan因此default案件立即匹配。 len(stack)是0并且沒有電流,job因此dispatcher關閉所有通道,包括mw. 此后的某個時候,一個工人嘗試寫入一個關閉的通道并發生恐慌。
所以最后我準備提供一些代碼,但我還有一個問題:我沒有一個明確的問題陳述來撰寫代碼。
我剛剛進入 Go 中的并發性,并嘗試創建一個調度 go 例程,該例程會將作業發送到偵聽 jobchan 通道的作業池。
goroutine 之間的通道就像同步齒輪的牙齒。但是齒輪到底要轉動到什么地方呢?您不是要計時,也不是要制作發條玩具。你的齒輪可以轉動,但成功會是什么樣子?他們的轉向?
讓我們嘗試為通道定義一個更具體的用例:給定一組任意長的持續時間作為標準輸入 * 上的字串,讓其中一個作業n人員休眠那么多秒。所以我們實際上有一個結果要回傳,我們會說每個作業人員將回傳持續時間運行的開始和結束時間。
- So that it can run in the playground, I'll simulate standard input with a hard-coded byte buffer.
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"strings"
"sync"
"time"
)
type SleepResult struct {
worker_id int
duration time.Duration
start time.Time
end time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&resultswg, resultschan)
for i := 0; i < num_workers; i {
wg.Add(1)
go worker(i, &wg, workchan, resultschan)
}
// playground doesn't have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
"3ms",
"1 seconds",
"3600ms",
"300 ms",
"5s",
"0.05min"}, "\n") "\n")
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, "Invalid duration", text)
} else {
workchan <- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait() // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {
for res := range resultschan {
fmt.Printf("Worker %d: %s : %s => %s\n",
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan <- res
}
wg.Done()
}
Here I use 2 wait groups, one for the workers, one for the results. This makes sure Im done writing all the results before main() ends. I keep my functions simple by having each function do exactly one thing at a time: main reads inputs, parses durations from them, and sends them off to the next worker. The results function collects results and prints them to standard output. The worker does the sleeping, reading from jobchan and writing to resultschan.
workchan can be buffered (or not, as in this case); it doesn't matter because the input will be read at the rate it can be processed. We can buffer as much input as we want, but we can't buffer an infinite amount. I've set channel sizes as big as 1e6 - but a million is a lot less than infinite. For my use case, I don't need to do any buffering at all.
main knows when the input is done and can close the jobschan. main also knows when jobs are done (wg.Wait()) and can close the results channel. Closing these channels is an important signal to the worker and results goroutines - they can distinguish between a channel that is empty and a channel that is guaranteed not to have any new additions.
for job := range jobchan {...} is shorthand for your more verbose:
for {
job, ok := <- jobchan
if !ok {
wg.Done()
return
}
...
}
請注意,此代碼創建了 2 個作業器,但它可以創建 20 或 2000 個,甚至 1 個。無論池中有多少個作業器,程式都會運行。它可以處理任何數量的輸入(盡管無休止的輸入當然會導致無休止的程式)。它并沒有創造的產出與投入的回圈回路。如果您的用例需要作業來創造更多作業,那么這是一個更具挑戰性的場景,通常可以通過仔細規劃來避免。
我希望這能給你一些關于如何在 Go 應用程式中更好地使用并發的好主意。
https://play.golang.wiki/p/cZuI9YXypxI
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/364107.html
