原文鏈接
Digesting a tree
Linux 上的 md5sum 命令,可以計算一些檔案的 md5 值:
root@ubuntu:~/gogo/tour# md5sum *.go
d2b2c3719370ea0aa7261325926111fd bounded.go
157126313040135745466593b6f65508 interface1.go
62f7464386fceaade2c83184f05e09b7 parallel.go
eea9d6c2ed85f03e099686b7b9ae5d68 serial.go
今天我們要看的實際應用管道的例子和 md5sum 命令類似,但是是以一個目錄路徑作為一個命令列引數,按照檔案路徑名的字母順序,列印出這個目錄下所有普通檔案的 md5 值,以下是例子 serial.go 程式的輸出:
root@ubuntu:~/gogo/tour# go run serial.go .
d2b2c3719370ea0aa7261325926111fd bounded.go
157126313040135745466593b6f65508 interface1.go
62f7464386fceaade2c83184f05e09b7 parallel.go
eea9d6c2ed85f03e099686b7b9ae5d68 serial.go
我們程式的 main 函式呼叫了一個輔助函式 MD5All,這個函式回傳一個從檔案路徑名到 md5 值的映射,然后將路徑名排序并列印出結果:
func main() {
// 計算特定目錄下所有檔案的 md5 值,按照路徑名排序列印出結果
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
// 獲取所有的檔案路徑名
var paths []string
for path := range m {
paths = append(paths, path)
}
// 排序
sort.Strings(paths)
// 列印出結果
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All 函式是我們討論的焦點,在 serial.go 中的實作沒有使用并發,僅僅是遍歷程序中,讀取每個檔案,計算它的 md5 值,
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
// 呼叫 filepath.Walk 遍歷目錄下的所有檔案,第二個引數是檔案處理函式
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
// 遍歷程序中出現錯誤,回傳錯誤
if err != nil {
return err
}
// 忽略不是普通檔案的檔案
if !info.Mode().IsRegular() {
return nil
}
// 讀取檔案內容
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
// 計算 md5 值
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
Parallel digestion
在 parallel.go 程式中,我們將 MD5All 分割成兩階段的管道:第一個階段,sumFiles,遍歷目錄下的檔案時,在一個新的 goroutine 中計算這個檔案的 md5 值,將結果發送到一個值型別為 result 的 channel :
type result struct {
path string // 檔案路徑名
sum [md5.Size]byte // md5 值
err error // 錯誤值
}
sumFiles 回傳兩個 channel:一個是為了收集結果,另一個是為了處理 filepath.Walk 回傳的錯誤,檔案處理函式啟動一個新的 goroutine 來處理每個普通檔案,然后檢查 done channel,如果 done channel 關閉了,那么立即終止遍歷:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 創建兩個 channel
c := make(chan result)
errc := make(chan error, 1)
// 啟動一個 goroutine
go func() {
var wg sync.WaitGroup
// 遍歷目錄下所有的檔案
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
// 啟動一個新的 goroutine 來計算這個檔案的 md5 值
go func() {
// 讀取檔案內容
data, err := ioutil.ReadFile(path)
// 使用 select 陳述句,在 done channel 關閉時,立即回傳
select {
// 計算檔案的 md5 值將結果發送到 channel
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// 如果 done channel 關閉了,則停止遍歷,回傳一個錯誤
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// 到這里 filepath.Walk 函式回傳了,所有的 wg.Add 的呼叫完成,
// 啟動一個 goroutine 等待所有發送結果的操作完成,并關閉 channel,
go func() {
wg.Wait()
close(c)
}()
// 將遍歷程序中回傳的錯誤值發往 channel
errc <- err
}()
// 回傳創建的 channel
return c, errc
}
在 MD5All 中的第二個階段從回傳結果的 channel 中接收 md5 值,在發現錯誤時,立即回傳,使用 defer 陳述句關閉 done channel:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// 創建 done channel
done := make(chan struct{})
// 函式回傳時關閉 done channel,將會使遍歷程序中的所有 goroutine 退出
defer close(done)
// 呼叫 sumFiles 計算所有檔案的 md5 值
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
// 從回傳結果的 channel 中接收 md5 值
for r := range c {
// 出現錯誤立即回傳
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 接收錯誤值,如果有錯誤發生回傳錯誤
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Bounded parallelism
在 parallel.go 程式中的 MD5All 實作為每個檔案啟動一個新的 goroutine,在一個包含大量大檔案的檔案夾中,分配的記憶體大小可能會超過機器可使用的,
我們可以限制并行地讀取檔案的數量來限制記憶體分配,在 bounded.go 中,我們為讀取檔案創建了固定數量的 goroutine,我們的管道現在有三個階段:遍歷目錄檔案,讀取檔案計算 md5 值,匯總結果,
第一個階段,walkFiles,將一個目錄下所有普通檔案的路徑名發往 paths channel:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
// 創建 paths channel
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// 在 Walk 回傳時關閉 paths channel,
defer close(paths)
// 遍歷目錄
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
// 將檔案名發往 paths channel
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
// 回傳創建的 channel
return paths, errc
}
第二個階段啟動了固定數量的 digester goroutine,從 paths channel 接收路徑名,讀取檔案計算 md5 值,將結果發送到一個收集結果的 channel:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
// 從 paths channel 接收每個路徑名
for path := range paths {
// 讀取檔案
data, err := ioutil.ReadFile(path)
select {
// 計算 md5 值,將結果發往 c channel,
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
在 MD5All 中,等所有的 digester goroutine 退出后,關閉輸出結果的 channel :
done := make(chan struct{})
defer close(done)
// 第一個階段
paths, errc := walkFiles(done, root)
// 創建收集結果的 channel
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
// 第二個階段,啟動固定數量的 goroutine 計算 md5 值
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
// 啟動一個新的 goroutine 等待所有操作完成,然后關閉 channel
go func() {
wg.Wait()
close(c)
}()
我們可以在 digester 創建并回傳一個它自己的輸出 channel,但是我們需要更多的 goroutine 來扇入這些結果,
最后一個階段從 c channel 接收所有的結果,并從 errc channel 接收錯誤值來檢查錯誤:
m := make(map[string][md5.Size]byte)
// 最后一個階段,從 c channel 接收所有的結果
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 檢查是否 Walk 函式回傳失敗
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/234301.html
標籤:區塊鏈
上一篇:golang外觀模式
