除了上一節我們介紹的 channel 通道,還有 sync.Mutex、sync.WaitGroup 這些原始的同步機制來,更加靈活的實作資料同步和控制并發,
資源競爭
所謂資源競爭,就是在程式中,同一塊記憶體同時被多個 goroutine 訪問,對于這個共享的資源(記憶體)每個 goroutine 都有不同的操作,就有可能造成資料紊亂,
示例:
package main
import (
"fmt"
"time"
)
var sum = 0
func main() {
//開啟100個協程來讓 sum + 1
for i := 1; i <= 100; i++ {
go add()
}
// 睡眠兩秒防止程式提前退出
time.Sleep(2 * time.Second)
fmt.Println("sum:",sum)
}
func add(){
sum += 1
}
//運行結果: sum:98 或 sum:99 或 ...
- 多次運行上面的程式,發現列印的結果可能存在不同,因為我們用多個協程來操作 sum,而 sum 不是并發安全的,存在競爭,
- 我們使用 go build、go run、go test 命令時,添加 -race 標識可以檢查代碼中是否存在資源競爭,
解決這個問題,我們可以給資源進行加鎖,讓其在同一時刻只能被一個協程來操作,
sync.Mutex
- 互斥鎖,使同一時刻只能有一個協程執行某段程式,其他協程等待該協程執行完再依次執行,
- 互斥鎖只有兩個方法 Lock (加鎖)和 Unlock(解鎖),當一個協程對資源上鎖后,只有等該協程解鎖,其他協程才能再次上鎖,
- Lock 和 Unlock 是成對出現,為了防止上鎖后忘記釋放鎖,我們可以使用 defer 陳述句來釋放鎖,
示例:
package main
import (
"fmt"
"sync"
"time"
)
var sum = 0
var mutex = sync.Mutex{}
func main() {
//開啟100個協程來讓 sum + 1
for i := 1; i <= 100; i++ {
go add()
}
// 睡眠兩秒防止程式提前退出
time.Sleep(2 * time.Second)
fmt.Println("sum:",sum)
}
func add(){
mutex.Lock()
defer mutex.Unlock() //使用defer陳述句,確保鎖一定會被釋放
sum += 1
}
symc.RWMutex
- 上面我們使用互斥鎖,來防止多個協程同時對 sum 做加法操作的時候產生資料錯亂,RWMutex為讀寫鎖,當讀取競爭資源的時候,因為資料不會改變,所以不管多少個 goroutine 讀都是并發安全的,
- 因為可以多個協程同時讀,不再相互等待,所以在性能上比互斥鎖會有很大的提升,
示例:
package main
import (
"fmt"
"sync"
"time"
)
var sum = 0
var mutex = sync.Mutex{}
var rwmutex = sync.RWMutex{}
func main() {
//開啟100個協程來讓 sum + 1
for i := 1; i <= 100; i++ {
go add()
}
for i := 1; i<= 10; i++ {
go fmt.Println("sum:",getSum())
}
// 睡眠兩秒防止程式提前退出
time.Sleep(2 * time.Second)
fmt.Println("sum:", sum)
}
func add(){
mutex.Lock()
defer mutex.Unlock() //使用defer陳述句,確保鎖一定會被釋放
sum += 1
}
func getSum() int {
rwmutex.RLock() //使用讀寫鎖
defer rwmutex.RUnlock()
return sum
}
sync.WaitGroup
- 上面的示例中,我們都是要了 time.Sleep(2 * time.Second),來防止:主函式 mian 回傳,提前退出程式,但是我們并不知道程式真正什么時候執行完,所以只能設定個長點的時間避免程式提前退出,這樣會產生性能問題,
- 這時候我們就用到了 sync.WaitGroup ,它可以監聽程式的執行,一旦全部執行完畢,程式就能馬上退出,
示例:
package main
import (
"fmt"
"sync"
)
var sum = 0
var mutex = sync.Mutex{}
var rwmutex = sync.RWMutex{}
func run() {
var wg sync.WaitGroup
//因為要監控110個協程,所以設定計數器為110
wg.Add(110)
for i := 1; i <= 100; i++ {
go func() {
//計數器值減1
defer wg.Done()
add()
}()
}
for i := 1; i <= 10; i++ {
go func() {
//計數器值減1
defer wg.Done()
fmt.Println("sum:", getSum())
}()
}
//一直等待,只要計數器值為0
wg.Wait()
}
func main() {
run()
}
func add() {
mutex.Lock()
defer mutex.Unlock() //使用defer陳述句,確保鎖一定會被釋放
sum += 1
}
func getSum() int {
rwmutex.RLock() //使用讀寫鎖
defer rwmutex.RUnlock()
return sum
}
- 示例中我們先宣告了 sync.WaitGroup ,然后通過 Add() 方法設定計數器的值,也就是說有多少個協程監聽,
- 在每個協程執行完畢后,呼叫 Done 方法來使計算器減 1,
- 最后呼叫 Wait 方法一直等待,直到計數器為 0,所以協程全部執行完畢,
sync.Once
有時候我們只希望代碼執行一次,即使是在高并發的場景下,比如創建一個單例,這種情況可以使用 sync.Once 來保證代碼只執行一次,
示例:
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
//用于等待協程執行完畢
done := make(chan bool)
//啟動10個協程執行once.Do(onceBody)
for i := 0; i < 10; i++ {
go func() {
//把要執行的函式(方法)作為引數傳給once.Do方法即可
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
//運行結果: Only once
- 上面這個是 Go 語言自帶的示例,雖然啟動了 10 個協程來執行 onceBody 函式,但是 once.DO 方法保證 onceBody 函式只會執行一次,
- sync.Once 適合用于創建單例、只加載一次資源等只需要執行一次的場景,
條件變數 sync.Cond
- 我們有一項任務,只有滿足了條件情況下才能執行,否則就等著,如何獲取這個條件呢?可以使用 channel 的方式,但是 channel 適用于一對一,一對多就需要用到 sync.Cond
- sync.Cond 是基于互斥鎖的基礎上,增加了一個通知佇列,協程剛開始是等待的,通知的協程會從通知佇列中喚醒一個或多個被通知的協程,
- sync.Cond 主要有以下幾個方法:
- sync.NewCond(&mutex) //sync.Cond 通過 sync.NewCond 初始化,需要傳入一個 mutex,因為阻塞等待通知的操作以及通知解除阻塞的操作就是基于 sync.Mutex 來實作的,
- sync.Wait() //等待通知
阻塞當前協程,直到被其他協程呼叫 Broadcast 或者 Signal 方法喚醒,使用的時候需要加鎖,使用 sync.Cond 中的鎖即可 - sync.Signal() //單發通知,隨機喚醒一個協程
- sync.Broadcat() //廣播通知,喚醒所有等待的協程,
示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
//3個人賽跑,1個裁判員發號施令
cond := sync.NewCond(&sync.Mutex{})
var wg sync.WaitGroup
wg.Add(4) //3選手+1裁判
for i := 1; i <= 3; i++ {
go func(num int) {
defer wg.Done()
fmt.Println(num, "號選手已經就位")
cond.L.Lock()
cond.Wait() //等待發令槍響
fmt.Println(num, "號選手開始跑……")
cond.L.Unlock()
}(i)
}
//等待所有goroutine都進入wait狀態
time.Sleep(2 * time.Second)
go func() {
defer wg.Done()
fmt.Println("裁判:“各就各位~~預備~~”")
fmt.Println("啪!!!")
cond.Broadcast() //發令槍響
}()
//防止函式提前回傳退出
wg.Wait()
}
運行結果:
3 號選手已經就位
1 號選手已經就位
2 號選手已經就位
裁判:“各就各位~~預備~~”
啪!!!
2 號選手開始跑……
3 號選手開始跑……
1 號選手開始跑……
最后貼一下 sync.Cond 幾個方法的原始碼:
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
// Wait方法釋放鎖,并阻塞協程執行,滿足條件解除阻塞后,當前協程需要獲得鎖然后Wait方法回傳,
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
// 由于解除阻塞后,當前協程不一定能馬上獲得鎖,因此回傳后需要再次檢查條件,所以通常
// 使用回圈,
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock() // 釋放鎖
runtime_notifyListWait(&c.notify, t) // 等待滿足條件,解除阻塞
c.L.Lock() // 獲取鎖
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
條件變數的 Wait 方法主要做了四件事:
- 把呼叫它的 goroutine(也就是當前的 goroutine)加入到當前條件變數的通知佇列中,
- 解鎖當前的條件變數基于的那個互斥鎖,
- 讓當前的 goroutine 處于等待狀態,等到通知到來時再決定是否喚醒它,此時,這個 goroutine 就會阻塞在呼叫這個 Wait 方法的那行代碼上,
- 如果通知到來并且決定喚醒這個 goroutine,那么就在喚醒它之后重新鎖定當前條件變數基于的互斥鎖,自此之后,當前的 goroutine 就會繼續執行后面的代碼了,
注意事項
- 呼叫 wait 方法的時候一定要加鎖,否則會導致程式發生 panic.
- wait 呼叫時需要檢查等待條件是否滿足,也就說 goroutine 被喚醒了不等于等待條件被滿足,等待者被喚醒,只是得到了一次檢查的機會而已,推薦寫法如下:
// c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock()
- Signal 和 Boardcast 兩個喚醒操作不需要加鎖
sync.Map
map 同時讀寫是執行緒不安全的,會發生了競態問題,而 sync.Map 和 map 型別一樣,只不過它是并發安全的,
sync.Map 的方法:
- Store : 存盤 key-value 值
- Load: 根據 key 獲取對應的 value 值,還可以判斷 key 是否存在,
- LoadOrStore: 如果 key 對應的 value 存在,則回傳 value ;不存在則存盤 key-value 值,
- Delete: 洗掉一個 key-value 鍵值對
- Range:遍歷 sync.Map
示例:
package main
import (
"fmt"
"sync"
)
func main() {
var syMap sync.Map
// 將鍵值對保存到sync.Map
syMap.Store("aaa", 111)
syMap.Store("bbb", 222)
syMap.Store("ccc", 333)
fmt.Println(syMap.LoadOrStore("ddd", 444))
// 從sync.Map中根據鍵取值
fmt.Println(syMap.Load("aaa"))
// 根據鍵洗掉對應的鍵值對
syMap.Delete("aaa")
// 遍歷所有sync.Map中的鍵值對
syMap.Range(func(k, v interface{}) bool {
fmt.Println("k:", k, "=》 v:", v)
return true
})
}
運行結果:
444 false
111 true
k: bbb =》 v: 222
k: ccc =》 v: 333
k: ddd =》 v: 444
sync.Map 沒有獲取 map 數量的方法,可以在 遍歷的時候自行計算數量,sync.Map 為了保證并發安全,犧牲了一些性能,如果沒有并發場景,推薦使用內置的 map 類,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/295961.html
標籤:Go
上一篇:web聊天室開發-Go
