主頁 > 作業系統 > 如何使用Goroutine批量處理檔案?

如何使用Goroutine批量處理檔案?

2022-03-14 11:05:14 作業系統

假設我有一堆檔案要處理(比如 1000 個或更多),首先它們應該由 function 處理A(),functionA()將生成一個檔案,然后這個檔案將由 B() 處理。

如果我們一個一個地做,那太慢了,所以我想用 goroutine 一次處理 5 個檔案(我們一次不能處理太多,因為 CPU 無法承受)。

我是golang的新手,不確定我的想法是否正確,我認為函式A()是生產者,函式B()是消費者,函式B()會處理函式產生的檔案,A()我在下面寫了一些代碼,見諒,我真的不知道怎么寫代碼,誰能幫幫我?先感謝您!

package main

import "fmt"

var Box = make(chan string, 1024)

func A(file string) {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1"
    Box <- fileGenByA
}

func B(file string) {
    fmt.Println(file, "is processing in func B()...")
}

func main() {
    // assuming that this is the file list read from a directory
    fileList := []string{
        "/path/to/file1",
        "/path/to/file2",
        "/path/to/file3",
    }

    // it seems I can't do this, because fileList may have 1000 or more file
    for _, v := range fileList {
        go A(v)
    }

    // can I do this?
    for file := range Box {
        go B(file)
    }
}

更新:

對不起,也許我沒有說清楚,實際上由函式生成的檔案A()存盤在硬碟中,而不是在變數(記憶體)中,所以它不必B()立即傳遞給函式。

uj5u.com熱心網友回復:

你已經成功了一半。您需要解決一些問題:

  1. 你的程式死鎖是因為沒有關閉任何東西Box,所以 main 函式永遠無法完成range它。
  2. 你不是在等待你的 goroutines 完成,而且有超過 5 個 goroutines。(這些的解決方案太交織在一起,無法單獨描述)

1. 死鎖

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()

當您range通過通道時,您會從通道中讀取每個值,直到它同時為 closed 和 empty既然你從來沒有close這個頻道,那range這個頻道就永遠無法完成,程式也永遠無法完成。

在您的情況下,這是一個相當容易解決的問題:我們只需要在知道不會有更多寫入通道時關閉通道。

    for _, v := range fileList {
        go A(v)
    }
    close(Box)

請記住,close讀取通道并不會阻止它被讀取,只會阻止它被寫入現在消費者可以區分未來可能接收更多資料的空通道和永遠不會接收更多資料的空通道。

一旦你添加了close(Box),程式就不會再死鎖了,但它仍然不起作用。

2. 太多的 Goroutines 而沒有等待它們完成

要運行某個最大數量的并發執行,而不是為每個輸入創建一個 goroutine,而是在“作業池”中創建 goroutine:

  • 創建一個通道來傳遞工人的作業
  • 為 goroutines 創建一個通道以回傳其結果(如果有)
  • 啟動你想要的 goroutines 數量
  • 至少啟動一個額外的 goroutine 來分派作業或收集結果,因此您不必嘗試從主 goroutine 執行這兩項操作
  • 使用 async.WaitGroup等待所有資料被處理
  • close通道向作業人員和結果收集器發出信號,表明他們的通道已完成填充。

在我們進入實作之前,讓我們談談如何AB互動。

首先它們應該由函式A()處理,函式A()會生成一個檔案,然后這個檔案將由B()處理。

A() and B() must, then, execute serially. They can still pass their data through a channel, but since their execution must be serial, it does nothing for you. Simpler is to run them sequentially in the workers. For that, we'll need to change A() to either call B, or to return the path for B and the worker can call. I choose the latter.

func A(file string) string {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1"
    return fileGenByA
}

Before we write our worker function, we also must consider the result of B. Currently, B returns nothing. In the real world, unless B() cannot fail, you would at least want to either return the error, or at least panic. I'll skip over collecting results for now.

Now we can write our worker function.

func worker(wg *sync.WaitGroup, incoming <-chan string) {
    defer wg.Done()
    for file := range incoming {
        B(A(file))
    }
}

Now all we have to do is start 5 such workers, write the incoming files to the channel, close it, and wg.Wait() for the workers to complete.

    incoming_work := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < 5; i   {
        wg.Add(1)
        go worker(&wg, incoming_work)
    }
    for _, v := range fileList {
        incoming_work <- v
    }
    close(incoming_work)
    wg.Wait()

Full example at https://go.dev/play/p/A1H4ArD2LD8

Returning Results.

It's all well and good to be able to kick off goroutines and wait for them to complete. But what if you need results back from your goroutines? In all but the simplest of cases, you would at least want to know if files failed to process so you could investigate the errors.

We have only 5 workers, but we have many files, so we have many results. Each worker will have to return several results. So, another channel. It's usually worth defining a struct for your return:

type result struct {
  file string
  err error
}

This tells us not just whether there was an error but also clearly defines which file from which the error resulted.

How will we test an error case in our current code? In your example, B always gets the same value from A. If we add A's incoming file name to the path it passes to B, we can mock an error based on a substring. My mocked error will be that file3 fails.

func A(file string) string {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1/"   file
    return fileGenByA
}

func B(file string) (r result) {
    r.file = file
    fmt.Println(file, "is processing in func B()...")
    if strings.Contains(file, "file3") {
        r.err = fmt.Errorf("Test error")
    }
    return
}

Our workers will be sending results, but we need to collect them somewhere. main() is busy dispatching work to the workers, blocking on its write to incoming_work when the workers are all busy. So the simplest place to collect the results is another goroutine. Our results collector goroutine has to read from a results channel, print out errors for debugging, and the return the total number of failures so our program can return a final exit status indicating overall success or failure.

    failures_chan := make(chan int)
    go func() {
        var failures int
        for result := range results {
            if result.err != nil {
                failures  
                fmt.Printf("File %s failed: %s", result.file, result.err.Error())
            }
        }
        failures_chan <- failures

    }()

Now we have another channel to close, and it's important we close it after all workers are done. So we close(results) after we wg.Wait() for the workers.

    close(incoming_work)
    wg.Wait()
    close(results)
    if failures := <-failures_chan; failures > 0 {
        os.Exit(1)
    }

Putting all that together, we end up with this code:

package main

import (
    "fmt"
    "os"
    "strings"
    "sync"
)

func A(file string) string {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1/"   file
    return fileGenByA
}

func B(file string) (r result) {
    r.file = file
    fmt.Println(file, "is processing in func B()...")
    if strings.Contains(file, "file3") {
        r.err = fmt.Errorf("Test error")
    }
    return
}

func worker(wg *sync.WaitGroup, incoming <-chan string, results chan<- result) {
    defer wg.Done()
    for file := range incoming {
        results <- B(A(file))
    }
}

type result struct {
    file string
    err  error
}

func main() {
    // assuming that this is the file list read from a directory
    fileList := []string{
        "/path/to/file1",
        "/path/to/file2",
        "/path/to/file3",
    }
    incoming_work := make(chan string)
    results := make(chan result)
    var wg sync.WaitGroup
    for i := 0; i < 5; i   {
        wg.Add(1)
        go worker(&wg, incoming_work, results)
    }
    failures_chan := make(chan int)
    go func() {
        var failures int
        for result := range results {
            if result.err != nil {
                failures  
                fmt.Printf("File %s failed: %s", result.file, result.err.Error())
            }
        }
        failures_chan <- failures

    }()
    for _, v := range fileList {
        incoming_work <- v
    }
    close(incoming_work)
    wg.Wait()
    close(results)
    if failures := <-failures_chan; failures > 0 {
        os.Exit(1)
    }
}

And when we run it, we get:

/path/to/file1 is processing in func A()...
/path/to/fileGenByA1//path/to/file1 is processing in func B()...
/path/to/file2 is processing in func A()...
/path/to/fileGenByA1//path/to/file2 is processing in func B()...
/path/to/file3 is processing in func A()...
/path/to/fileGenByA1//path/to/file3 is processing in func B()...
File /path/to/fileGenByA1//path/to/file3 failed: Test error
Program exited.

A final thought: buffered channels.

There is nothing wrong with buffered channels. Especially if you know the overall size of incoming work and results, buffered channels can obviate the results collector goroutine because you can allocate a buffered channel big enough to hold all results. However, I think it's more straightforward to understand this pattern if the channels are unbuffered. The key takeaway is that you don't need to know the number of incoming or outgoing results, which could indeed be different numbers or based on something that can't be predetermined.

uj5u.com熱心網友回復:

您可以生成 5 個從作業通道讀取的 goroutine。這樣你就可以一直運行 5 個 goroutine 并且不需要對它們進行批處理,這樣你就必須等到 5 個完成才能開始下一個 5 個。

func main() {
    stack := []string{
        "foo",
        "bar",
        "baz",
        "qux",
        "quux",
        "corge",
    }

    work := make(chan string)
    results := make(chan string)

    // create 5 go routines
    wg := sync.WaitGroup{}
    for i := 0; i < 5; i   {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for s := range work {
                results <- B(A(s))
            }
        }()
    }

    // collect the results
    go func() {
        for result := range results {
            fmt.Println(result)
        }
    }()

    // send the work to the workers
    for _, s := range stack {
        work <- s
    }
    close(work)

    // wait for the workers to finish
    // then close the results channel
    wg.Wait()
    close(results)
}

https://play.golang.com/p/IgoMfAR-Tya

uj5u.com熱心網友回復:

請檢查這個。

package main

import (
    "fmt"
    "sync"
    "time"
)

var batchSize = 5

func A(file string, releaseReq chan struct{}, box chan string, done *sync.WaitGroup) {
    defer func() {
        <-releaseReq
        done.Done()
    }()
    time.Sleep(2 * time.Second)
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1"
    box <- fileGenByA
}

func B(file string, done *sync.WaitGroup) {
    defer func() {
        done.Done()
    }()
    time.Sleep(1 * time.Second)
    fmt.Println(file, "is processing in func B()...")
}

func main() {
    fileList := []string{
        "/path/to/file1",
        "/path/to/file2",
        "/path/to/file3",
        "/path/to/file4",
        "/path/to/file5",
        "/path/to/file6",
        "/path/to/file7",
        "/path/to/file8",
        "/path/to/file9",
        "/path/to/file10",
    }
    box := make(chan string, 5)

    var doneProcessA sync.WaitGroup
    doneProcessA.Add(1)
    go func() {
        rateLimitter := make(chan struct{}, 5)
        var processA sync.WaitGroup
        for _, v := range fileList {
            rateLimitter <- struct{}{}
            processA.Add(1)
            go A(v, rateLimitter, box, &processA)
        }
        processA.Wait()
        doneProcessA.Done()
        close(box)
    }()

    var doneProcessB sync.WaitGroup
    doneProcessB.Add(1)
    go func() {
        var processB sync.WaitGroup
        for file := range box {
            processB.Add(1)
            go B(file, &processB)
        }
        processB.Wait()
        doneProcessB.Done()
    }()
    doneProcessA.Wait()
    doneProcessB.Wait()
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/442995.html

標籤: 协程

上一篇:restTemplate.postForEntity的單元測驗導致ResourceAccessException

下一篇:如何將中間件用于特定路由等?

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • CA和證書

    1、在 CentOS7 中使用 gpg 創建 RSA 非對稱密鑰對 gpg --gen-key #Centos上生成公鑰/密鑰對(存放在家目錄.gnupg/) 2、將 CentOS7 匯出的公鑰,拷貝到 CentOS8 中,在 CentOS8 中使用 CentOS7 的公鑰加密一個檔案 gpg -a ......

    uj5u.com 2020-09-10 00:09:53 more
  • Kubernetes K8S之資源控制器Job和CronJob詳解

    Kubernetes的資源控制器Job和CronJob詳解與示例 ......

    uj5u.com 2020-09-10 00:10:45 more
  • VMware下安裝CentOS

    VMware下安裝CentOS 一、軟硬體準備 1 Centos鏡像準備 1.1 CentOS鏡像下載地址 下載地址 1.2 CentOS鏡像下載程序 點擊下載地址進入如下圖的網站,選擇需要下載的版本,這里選擇的是Centos8,點擊如圖所示。 決定選擇Centos8后,選擇想要的鏡像源進行下載,此 ......

    uj5u.com 2020-09-10 00:12:10 more
  • 如何使用Grep命令查找多個字串

    如何使用Grep 命令查找多個字串 大家好,我是良許! 今天向大家介紹一個非常有用的技巧,那就是使用 grep 命令查找多個字串。 簡單介紹一下,grep 命令可以理解為是一個功能強大的命令列工具,可以用它在一個或多個輸入檔案中搜索與正則運算式相匹配的文本,然后再將每個匹配的文本用標準輸出的格式 ......

    uj5u.com 2020-09-10 00:12:28 more
  • git配置http代理

    git配置http代理 經常遇到克隆 github 慢的問題,這里記錄一下幾種配置 git 代理的方法,解決 clone github 過慢。 目錄 git配置代理 git單獨配置github代理 git配置全域代理 配置終端環境變數 git配置代理 主要使用 git config 命令 git單獨 ......

    uj5u.com 2020-09-10 00:12:33 more
  • Linux npm install 裝包時提示Error EACCES permission denied解

    npm install 裝包時提示Error EACCES permission denied解決辦法 ......

    uj5u.com 2020-09-10 00:12:53 more
  • Centos 7下安裝nginx,使用yum install nginx,提示沒有可用的軟體包

    Centos 7下安裝nginx,使用yum install nginx,提示沒有可用的軟體包。 18 (flaskApi) [root@67 flaskDemo]# yum -y install nginx 19 已加載插件:fastestmirror, langpacks 20 Loading ......

    uj5u.com 2020-09-10 00:13:13 more
  • Linux查看服務器暴力破解ssh IP

    在公網的服務器上經常遇到別人爆破你服務器的22埠,用來挖礦或者干其他嘿嘿嘿的事情~ 這種情況下正確的做法是: 修改默認ssh的22埠 使用設定密鑰登錄或者白名單ip登錄 建議服務器密碼為復雜密碼 創建普通用戶登錄服務器(root權限過大) 建立堡壘機,實作統一管理服務器 統計爆破IP [root ......

    uj5u.com 2020-09-10 00:13:17 more
  • CentOS 7系統常見快捷鍵操作方式

    Linux系統中一些常見的快捷方式,可有效提高操作效率,在某些時刻也能避免操作失誤帶來的問題。 ......

    uj5u.com 2020-09-10 00:13:31 more
  • CentOS 7作業系統目錄結構介紹

    作業系統存在著大量的資料檔案資訊,相應檔案資訊會存在于系統相應目錄中,為了更好的管理資料資訊,會將系統進行一些目錄規劃,不同目錄存放不同的資源。 ......

    uj5u.com 2020-09-10 00:13:35 more
最新发布
  • vim的常用命令

    Vim的6種基本模式 1. 普通模式在普通模式中,用的編輯器命令,比如移動游標,洗掉文本等等。這也是Vim啟動后的默認模式。這正好和許多新用戶期待的操作方式相反(大多數編輯器默認模式為插入模式)。 2. 插入模式在這個模式中,大多數按鍵都會向文本緩沖中插入文本。大多數新用戶希望文本編輯器編輯程序中一 ......

    uj5u.com 2023-04-20 08:43:21 more
  • vim的常用命令

    Vim的6種基本模式 1. 普通模式在普通模式中,用的編輯器命令,比如移動游標,洗掉文本等等。這也是Vim啟動后的默認模式。這正好和許多新用戶期待的操作方式相反(大多數編輯器默認模式為插入模式)。 2. 插入模式在這個模式中,大多數按鍵都會向文本緩沖中插入文本。大多數新用戶希望文本編輯器編輯程序中一 ......

    uj5u.com 2023-04-20 08:42:36 more
  • docker學習

    ###Docker概述 真實專案部署環境可能非常復雜,傳統發布專案一個只需要一個jar包,運行環境需要單獨部署。而通過Docker可將jar包和相關環境(如jdk,redis,Hadoop...)等打包到docker鏡像里,將鏡像發布到Docker倉庫,部署時下載發布的鏡像,直接運行發布的鏡像即可。 ......

    uj5u.com 2023-04-19 09:26:53 more
  • 設定Windows主機的瀏覽器為wls2的默認瀏覽器

    這里以Chrome為例。 1. 準備作業 wsl是可以使用Windows主機上安裝的exe程式,出于安全考慮,默認情況下改功能是無法使用。要使用的話,終端需要以管理員權限啟動。 我這里以Windows Terminal為例,介紹如何默認使用管理員權限打開終端,具體操作如下圖所示: 2. 操作 wsl ......

    uj5u.com 2023-04-19 09:25:49 more
  • docker學習

    ###Docker概述 真實專案部署環境可能非常復雜,傳統發布專案一個只需要一個jar包,運行環境需要單獨部署。而通過Docker可將jar包和相關環境(如jdk,redis,Hadoop...)等打包到docker鏡像里,將鏡像發布到Docker倉庫,部署時下載發布的鏡像,直接運行發布的鏡像即可。 ......

    uj5u.com 2023-04-19 09:19:04 more
  • Linux學習筆記

    IP地址和主機名 IP地址 ifconfig可以用來查詢本機的IP地址,如果不能使用,可以通過install net-tools安裝。 Centos系統下ens33表示主網卡;inet后表示IP地址;lo表示本地回環網卡; 127.0.0.1表示代指本機;0.0.0.0可以用于代指本機,同時在放行設 ......

    uj5u.com 2023-04-18 06:52:01 more
  • 解決linux系統的kdump服務無法啟動的問題

    問題:專案麒麟系統服務器的kdump服務無法啟動,沒有相關日志無法定位問題。 1、查看服務狀態是關閉的,重啟系統也無法啟動 systemctl status kdump 2、修改grub引數,修改“crashkernel”為“512M(有的機器數值太大太小都會導致報錯,建議從128M開始試,或者加個 ......

    uj5u.com 2023-04-12 09:59:50 more
  • 解決linux系統的kdump服務無法啟動的問題

    問題:專案麒麟系統服務器的kdump服務無法啟動,沒有相關日志無法定位問題。 1、查看服務狀態是關閉的,重啟系統也無法啟動 systemctl status kdump 2、修改grub引數,修改“crashkernel”為“512M(有的機器數值太大太小都會導致報錯,建議從128M開始試,或者加個 ......

    uj5u.com 2023-04-12 09:59:01 more
  • 你是不是暴露了?

    作者:袁首京 原創文章,轉載時請保留此宣告,并給出原文連接。 如果您是計算機相關從業人員,那么應該經歷不止一次網路安全專項檢查了,你肯定是收到過資訊系統技術檢測報告,要求你加強風險監測,確保你提供的系統服務堅實可靠了。 沒檢測到問題還好,檢測到問題的話,有些處理起來還是挺麻煩的,尤其是線上正在運行的 ......

    uj5u.com 2023-04-05 16:52:56 more
  • 細節拉滿,80 張圖帶你一步一步推演 slab 記憶體池的設計與實作

    1. 前文回顧 在之前的幾篇記憶體管理系列文章中,筆者帶大家從宏觀角度完整地梳理了一遍 Linux 記憶體分配的整個鏈路,本文的主題依然是記憶體分配,這一次我們會從微觀的角度來探秘一下 Linux 內核中用于零散小記憶體塊分配的記憶體池 —— slab 分配器。 在本小節中,筆者還是按照以往的風格先帶大家簡單 ......

    uj5u.com 2023-04-05 16:44:11 more