我想知道并了解如何在生產者/消費者方案中使用 go 執行過濾并發管道。
我設法撰寫了一個檢查值的版本,如果正常,則將其發送到一個通道,如果沒有,則將值發送到另一個通道。
在讀取并處理值后,兩個 goroutine 負責讀取處理后的值并將它們寫入檔案。這個版本運行正常。但...
假設我不想要無效值。有沒有辦法改變 select 陳述句(或消費者 goroutine),以便只輸出正確的值(即只使用一個輸出通道)。我嘗試洗掉那個 invalidValues 頻道,但我沒有成功。
我嘗試將 select 陳述句放在
if valid?; 在這個版本中,有一個帶有完整陳述句的分支,在 false 分支中,只需要等待完成的通道。通過這種方式,我可以丟棄無效值并使用一個通道,但我用這種方法也沒有成功。
有想法該怎么解決這個嗎?
- 此外,在這個方案中,我想知道為什么如果我省略從 invalidValues 通道中洗掉值的 goroutine 程式不會完成?是不是通道需要清空否則保持阻塞?有沒有更優雅的方法來做到這一點來對值進行范圍?
謝謝!!
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
這是代碼的完整版本
done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//Producer reads a file with values and stores them in a channel
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count 1
}
close(inputStream)
}()
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//Write outputStream and error to files
for r := range outputStream {
_, err := resultFile.WriteString(r "\n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r "\n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()
uj5u.com熱心網友回復:
洗掉無效頻道:
for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value
case <-done:
return
}
}
}
如果洗掉無效值讀取器 goroutine,則必須將等待組更改為:
wg2.Add(1)
所以你不會無限期地等待。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/358307.html
上一篇:如何處理Gin中間件中的錯誤
