一、并行管道搭建:
總結下實作思路:
- 歸并排序:進行集合元素排序(節點),并兩兩節點歸并排序;每個節點元素要求有序的(排序),當然終點最小節點元數個數為1必是有序的;
- 節點:任務處理單元,歸并排序節點是處理輸出有序集合任務的單元;檔案過大單臺機排不了需要多臺機集群;
- 根據粒度,單機版:非并發節點可以是排序方法,并發節點可以是一個執行緒/協程去處理(異步排序),集群版節點是一個主機;
- 單機版,不管并發還是非并發,節點采用的是記憶體共享資料;集群版節點則需要網路連接請求應答來共享資料;
- go語言異步資料傳輸通道通過channel實作的;
- 每個節點將處理的資料異步發送到各自channel中,等待一個主節點獲取歸并,集群版多了網路的資料傳輸,
二、代碼實作:
- 本地節點 nodes.go:
package pipeline import ( "encoding/binary" "fmt" "io" "math/rand" "sort" "time" ) var startTime time.Time func Init() { startTime = time.Now() } //內部處理方法 //這里是排序:異步處理容器元素排序 func InMemSort(in <-chan int) <-chan int { out := make(chan int, 1024) go func() { a := []int{} for v := range in { a = append(a, v) } fmt.Println("Read done:", time.Since(startTime)) sort.Ints(a) fmt.Println("InMemSort done:", time.Since(startTime)) for _, v := range a { out <- v } close(out) }() return out } //兩路和并,每路通過內部方法異步處理 //這里是排序:in1,in2元素需要排好序(經過內部方法InMemSort異步處理)的容器單元(channel 異步容器/佇列) func Merge(in1, in2 <-chan int) <-chan int { out := make(chan int, 1024) // go func() { // v1, ok1 := <-in1 // v2, ok2 := <-in2 // for { // if ok1 || ok2 { // if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大 // out <- v1 // v1, ok1 = <-in1 // } else { // out <- v2 // v2, ok2 = <-in2 // } // } else { // close(out) // break // } // } // }() go func() { v1, ok1 := <-in1 v2, ok2 := <-in2 for ok1 || ok2 { if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大 out <- v1 v1, ok1 = <-in1 } else { out <- v2 v2, ok2 = <-in2 } } close(out) fmt.Println("Merge done:", time.Since(startTime)) }() return out } //讀取原資料 //chunkSize=-1全讀 func ReadSource(r io.Reader, chunkSize int) <-chan int { out := make(chan int, 1024) go func() { buffer := make([]byte, 8) //int長度根據作業系統來的,64位為int64,64位8個位元組 bytesRead := 0 for { //持續讀取 n, err := r.Read(buffer) //讀取一個int 8byte bytesRead += n if n > 0 { out <- int(binary.BigEndian.Uint64(buffer)) //位元組陣列轉int } if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全讀 break } } close(out) }() return out } //寫處理后(排序)資料 func WriteSink(w io.Writer, in <-chan int) { for v := range in { buffer := make([]byte, 8) binary.BigEndian.PutUint64(buffer, uint64(v)) w.Write(buffer) } } //隨機生成資料源 func RandomSource(count int) <-chan int { out := make(chan int) go func() { for i := 0; i < count; i++ { out <- rand.Int() } close(out) }() return out } //多路兩兩歸并,每路通過內部方法異步處理 //這里是排序:ins元素需要排好序(經過內部方法InMemSort異步處理)的容器單元(channel 異步容器/佇列) func MergeN(ins ...<-chan int) <-chan int { if len(ins) == 1 { return ins[0] } m := len(ins) / 2 return Merge( MergeN(ins[:m]...), MergeN(ins[m:]...)) //chennel異步并發歸并 } - 網路節點:
package pipeline import ( "bufio" "net" ) //節點服務端資料寫入到Network中 //開啟服務后,用goroutine等連接,避免創建pipeline阻塞 func NetworkSink(addr string, in <-chan int) { //net必須是面向流的網路:"tcp"、"tcp4"、"tcp6"、"unix"或"unixpacket" listener, err := net.Listen("tcp", addr) //addr ip:port if err != nil { panic(err) } go func() { //不能等待阻塞 for { conn, err := listener.Accept() if err != nil { continue } w := bufio.NewWriter(conn) WriteSink(w, in) w.Flush() //使用bufio Writer最后一定要Flush把快取資料發出去 defer conn.Close() //關閉 } // defer listener.Close() // conn, err := listener.Accept() // if err != nil { // panic(err) // } // defer conn.Close() // w := bufio.NewWriter(conn) // WriteSink(w, in) // defer w.Flush() }() } //Network向節點服務端讀取資料源 func NetworkSource(addr string) <-chan int { out := make(chan int) go func() { conn, err := net.Dial("tcp", addr) if err != nil { panic(err) } defer conn.Close() r := ReadSource(bufio.NewReader(conn), -1) for v := range r { out <- v } close(out) }() return out } - 創建管道:
package main import ( "bufio" "fmt" "goBase/pipelinedemo/pipeline" "os" "strconv" ) const sourceFilename = "../large.in" const resultFilename = "../large.out" //單機版而言,并發使用channel效率肯定是下降的 //好處,當檔案過大,一臺機器排不了,多機排序 func main() { p, files := createNetworkPipeline(sourceFilename, 800000000, 4) //平均每個檔案讀取int64數:800000000/8/4 defer func() { for _, file := range files { file.Close() } }() writeToFile(p, resultFilename) //該方法運行,通道才真正打開 printFile(resultFilename) } //創建并行處理管道 //fileSize 檔案位元組數 //chunkCount 節點數 讀取檔案分塊數 func createNetworkPipeline(filename string, fileSize, chunkCount int) (<-chan int, []*os.File) { chunkSize := fileSize / chunkCount //每個節點讀取檔案位元組數 //outs := make([]<-chan int, chunkCount) outs := []<-chan int{} sortAddr := []string{} files := []*os.File{} pipeline.Init() //開始計時 //#region 節點服務端作業 for count := 0; count < chunkCount; count++ { file, err := os.Open(filename) //這里file沒有close,需要回傳*[]File,在外面close if err != nil { panic(err) } files = append(files, file) //Seek設定下一次讀/寫的位置,offset為相對偏移量, //whence決定相對位置:0為相對檔案開頭,1為相對當前位置,2為相對檔案結尾 file.Seek(int64(count*chunkSize), 0) //讀檔案位元組范圍 source := pipeline.ReadSource(bufio.NewReader(file), chunkSize) // outs = append(outs, pipeline.InMemSort(source)) //本機地址 addr := ":" + strconv.Itoa(7000+count) //將數字轉換成對應的字串型別的數字 pipeline.NetworkSink(addr, pipeline.InMemSort(source)) //開啟節點服務監聽,收到請求發送資料將寫入到Network,異步不能等待阻塞 sortAddr = append(sortAddr, addr) } //#endregion //#region Network作業 for _, addr := range sortAddr { outs = append(outs, pipeline.NetworkSource(addr)) } //構建管道,goroutine還沒有運行,不能確定InMemSort是否全部排序完成,不能在該方法close file return pipeline.MergeN(outs...), files //#endregion } func writeToFile(in <-chan int, filename string) { file, err := os.Create(filename) if err != nil { panic(err) } defer file.Close() w := bufio.NewWriter(file) defer w.Flush() pipeline.WriteSink(w, in) } func printFile(filename string) { file, err := os.Open(filename) if err != nil { panic(err) } defer file.Close() count := 0 all := pipeline.ReadSource(bufio.NewReader(file), -1) for s := range all { fmt.Println(s) count++ if count > 100 { break } } }
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/471735.html
標籤:Go
上一篇:列舉與聯合體的介紹與實體

