1. Goroutine同步【資料同步】
為什么需要goroutine同步
gorotine同步概念、以及同步的幾種方式
1.1 為什么需要goroutine同步
package mainimport ( "fmt" "sync")var A = 10var wg = sync.WaitGroup{}func Add(){ defer wg.Done() for i:=0;i<1000000;i++{ A += 1 }}func main() { wg.Add(2) go Add() go Add() wg.Wait() fmt.Println(A)}# output:1061865 # 每運行一次結果都不一樣,但是都不是我們預期的結果2000000
多goroutine【多任務】,有共享資源,且多goroutine修改共享資源,出現資料不安全問題【資料錯誤】,保證資料安全一致,需要goroutine同步
1.2 goroutine同步
goroutine按照約定的順序執行,解決資料不安全問題,
1.3 goroutine同步方式
channel 【csp模型】
互斥鎖 【傳統同步機制】
讀寫鎖 【傳統同步機制】
條件變數 【傳統同步機制】
2. 傳統同步機制
2.1 互斥鎖
2.1.1 特點
加鎖成功則操作資源,加鎖失敗則等待直至鎖加鎖成功----所有的goroutine互斥,一個得到鎖其他全部等待
解決了資料安全問題,降低了程式的性能,適用讀寫不太頻繁的場景

2.1.2 鎖顆粒度問題
顆粒度是指,加鎖的范圍,哪里使用資源哪里加鎖,盡可能減少加鎖范圍
單元測驗基本使用流程
新建單元測驗檔案
撰寫測驗案例
gotest運行生成對應的prof檔案
go tool 查看生成的prof檔案
package main_testimport ( "fmt" "sync" "testing")var A = 10var wg = sync.WaitGroup{}var mux sync.Mutexfunc Add(){ defer wg.Done() for i:=0;i<1000000;i++{ mux.Lock() A += 1 mux.Unlock() }}/*// 加大鎖顆粒度func Add(){ defer wg.Done() mux.Lock() for i:=0;i<1000000;i++{ A += 1 } mux.Unlock()}*/// 單元測驗格式, func TestMux(t *testing.T) { wg.Add(2) go Add() go Add() wg.Wait() fmt.Println(A)}
# 生成prof檔案,-cpuprofile 引數指定生成什么型別的prof cpu.prof指定生成profile檔案名字go test mutex_test.go -cpuprofile cpu.prof# 查看生成的prof檔案,pprof 指定查看的檔案型別go tool pprof cpu.prof# 下面是輸出資訊Type: cpuTime: Jul 10, 2019 at 2:38pm (CST)Duration: 201.43ms, Total samples = 80ms (39.72%)Entering interactive mode (type "help" for commands, "o" for options)(pprof) top # 這里使用top命令查看測驗中cpu使用的資訊Showing nodes accounting for 80ms, 100% of 80ms total flat flat% sum% cum cum% 60ms 75.00% 75.00% 60ms 75.00% sync.(*Mutex).Unlock 20ms 25.00% 100% 20ms 25.00% sync.(*Mutex).Lock 0 0% 100% 80ms 100% command-line-arguments_test.Add(pprof) svg #svg 保存可視化檔案,可以使用瀏覽器可視化查看(pprof) list Add # 查看對應函式的詳細時間消耗資訊
注意:
當前得測驗案例,是程式設計的錯誤【這種快速計算性的,一個goroutine已經可以勝任,更多時候讀寫分離,互斥鎖不適合這種頻繁讀寫場景】,不是鎖使用的錯誤
2.1.3 sync.once 原始碼閱讀
// Once is an object that will perform exactly one action.type Once struct { m Mutex done uint32 // 標識是否已執行過任務,如果設定為1 則說明任務已執行過了}// DO 呼叫用戶執行的方法,僅呼叫一次func (o *Once) Do(f func()) { // 原子操作判斷done,已被置成1,如果done是1 說明方法已被執行,直接回傳 if atomic.LoadUint32(&o.done) == 1 { return } // 加鎖 o.m.Lock() defer o.m.Unlock() // done為0則開始,呼叫用戶函式方法 if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() }}
2.2 讀寫鎖
讀寫互斥,讀者可以重復加鎖,寫加鎖需要等待所有讀者解鎖,寫加鎖期間所有讀者wait【寫優先級高于讀,讀寫同時加鎖寫著加鎖先成功】
適用寫少讀多的場景,相比互斥鎖可以一定程度提高程式性能
僅有讀者

- 寫著加鎖更新資料




2.3 條件變數
條件變數的作用并不保證在同一時刻僅有一個協程(執行緒)訪問某個共享的資料資源,而是在對應的共享資料的狀態發生變化時,通知阻塞在某個條件上的協程(執行緒),條件變數不是鎖,在并發中不能達到同步的目的,因此條件變數總是與鎖一塊使用,可以認為條件變數是對鎖的一種補充,某種程度上提高鎖機制帶來的效率低下的問題
2.3.1 條件變數API介紹
創建條件變數 【創建后不能被被拷貝】
// 引數傳遞一把鎖,回傳指標型別cond:=sync.NewCond(&sync.Mutex{})Cond.Wait() ,阻塞再條件變數上讓出cup資源
// 阻塞在條件變數上面,會把當前gorotine掛載到Cond佇列上面cond.Wait()// 1. 釋放鎖,并把自己掛載到通知佇列,阻塞等待【原子操作】// 2. 接收到喚醒信號,嘗試獲取鎖// 3. 獲取鎖成功則 回傳
Cond.Signal() 隨機喚醒一個阻塞在條件變數上的goroutine
// 喚醒阻塞在條件變數上的goroutine,處于wait【呼叫了cond.wait】狀態的goroutine// 隨機喚醒通知佇列上的一個執行緒,并從通知佇列移除cond.Signal() // 發送喚醒信號
Cond.Broadcast() 廣播通知所有處于wait狀態的goroutine
// 廣播通知所有處于wait狀態的goroutine// 通知通知佇列上的所有的gorotine,并且把所有的goroutine從通知佇列 取下來cond.Broadcast()
2.3.2 條件變數在生產者消費模型中使用
潛在bug-->deadlock【生產者消費者都死鎖在cond.wait,沒有其他的goroutine喚醒】
package mainimport "fmt"import "sync"import "math/rand"import "time"var cond sync.Cond // 創建全域條件變數// 生產者func producer(out chan<- int, idx int) { for { cond.L.Lock() // 條件變數對應互斥鎖加鎖 for len(out) == 3 { // 產品區滿 等待消費者消費 cond.Wait() // 掛起當前協程, 等待條件變數滿足,被消費者喚醒 } num := rand.Intn(1000) // 產生一個亂數 out <- num // 寫入到 channel 中 (生產) fmt.Printf("%dth 生產者,產生資料 %3d, 公共區剩余%d個資料\n", idx, num, len(out)) cond.L.Unlock() // 生產結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 消費者 time.Sleep(time.Second) // 生產完休息一會,給其他協程執行機會, 解決了死鎖機會的降低 }}//消費者func consumer(in <-chan int, idx int) { for { cond.L.Lock() // 條件變數對應互斥鎖加鎖(與生產者是同一個) for len(in) == 0 { // 產品區為空 等待生產者生產 cond.Wait() // 掛起當前協程, 等待條件變數滿足,被生產者喚醒 } num := <-in // 將 channel 中的資料讀走 (消費) fmt.Printf("---- %dth 消費者, 消費資料 %3d,公共區剩余%d個資料\n", idx, num, len(in)) cond.L.Unlock() // 消費結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 生產者 time.Sleep(time.Millisecond * 500) //消費完 休息一會,給其他協程執行機會, 解決了死鎖機會的降低 }}func main() { rand.Seed(time.Now().UnixNano()) // 設定亂數種子 quit := make(chan bool) // 創建用于結束通信的 channel product := make(chan int, 3) // 產品區(公共區)使用channel 模擬 cond.L = new(sync.Mutex) // 創建互斥鎖和條件變數 for i := 0; i < 5; i++ { // 5個消費者 go producer(product, i+1) } for i := 0; i < 3; i++ { // 3個生產者 go consumer(product, i+1) } <-quit // 主協程阻塞 不結束}deadlock原因剖析【極值法】
極端處理: 1個生產者 2 消費 channle 快取1
由于極端一些情況,會導致所有的生產者與消費者都會進入到一個wait 狀態,沒有人喚醒
解決bug----單向喚醒,由生產者喚醒消費者
喚醒方向問題: 由速率低的一方喚醒速率高的一方
package mainimport ( "fmt" "runtime")import "sync"import "math/rand"import "time"var cond sync.Cond // 創建全域條件變數// 生產者func producer(out chan<- int, idx int) { for { num := rand.Intn(1000) // 產生一個亂數 cond.L.Lock() // 條件變數對應互斥鎖加鎖 select { // 嘗試向channel寫入資料 case out <- num: fmt.Printf("%dth 生產者,產生資料 %3d, 公共區剩余%d個資料\n", idx, num, len(out)) default: } cond.L.Unlock() // 生產結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 消費者 runtime.Gosched() // 給別更多的機會創建鎖 }}//消費者func consumer(in <-chan int, idx int) { var num int for { cond.L.Lock() // 條件變數對應互斥鎖加鎖(與生產者是同一個) for len(in)==0{ cond.Wait() } num=<-in fmt.Printf("%dth 消費者,消費了 %d, 公共區剩余%d個資料\n", idx, num, len(in)) cond.L.Unlock() // 消費結束,解鎖互斥鎖 }}func main() { rand.Seed(time.Now().UnixNano()) // 設定亂數種子 quit := make(chan bool) // 創建用于結束通信的 channel product := make(chan int, 3) // 產品區(公共區)使用channel 模擬 cond.L = new(sync.Mutex) // 創建互斥鎖和條件變數 for i := 0; i < 3; i++ { // 3個生產者 go producer(product, i+1) } for i := 0; i < 5; i++ { // 5個消費者 go consumer(product, i+1) } <-quit // 主協程阻塞 不結束}
問題:
當我們把條件變數取消,使用帶快取的channel,同樣很好的完成生產者與消費者模型【channel空與非空主動阻塞等待,直至解除阻塞】,why use cond?
2.3.3 channel vs sync.Cond
使用channel通知多個關注條件的goroutine問題?
關閉的channle 與廣播的作用,僅僅單次使用
當狀態多重情況的時候,channel 不行了,使用cond廣播的方式進行狀態更新
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/59806.html
標籤:Go
上一篇:多任務的使用模式
