singleflight 使用方法以及原始碼閱讀
1、簡介
安裝方式:
go get -u golang.org/x/sync/singleflight
singleflight 是Go官方擴展同步包的一個庫,通過給每次函式呼叫分配一個key,相同key的函式并發呼叫時,在函式執行期間,相同函式的呼叫,只會被執行一次,回傳相同的結果,其本質是對函式呼叫的結果進行復用,
2、使用方法
2.1 使用Do獲取函式執行結果
Do方法是同步回傳函式執行結果
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"runtime"
"sync"
"time"
)
func main() {
var sg singleflight.Group
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
v, err, shared := sg.Do("testDo", testDo)
fmt.Printf("i: %v, v:%v, err:%v, shared:%v\n", j, v, err, shared)
}(i)
}
wg.Wait()
}
func testDo() (interface{}, error) {
// 模擬函式執行需要的時間
time.Sleep(time.Millisecond)
return "testDo", nil
}
2.2 使用DoChan獲取函式執行結果
DoChan回傳一個 channel,函式執行的結果通過 channel 來進行傳遞,
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"runtime"
"sync"
"time"
)
func main() {
var sg singleflight.Group
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
ch := sg.DoChan("testDoChan", testDoChan)
select {
case ret := <- ch:
fmt.Printf("i: %v, v:%v, err:%v, shared:%v\n", j, ret.Val, ret.Err, ret.Shared)
}
}(i)
}
wg.Wait()
}
func testDoChan() (interface{}, error) {
// 模擬函式執行需要的時間
time.Sleep(time.Millisecond)
return "testDoChan", nil
}
3、原始碼解讀
3.1 Group
//Group 整個庫的核心結構體
type Group struct {
mu sync.Mutex // 并發時,保護 m
m map[string]*call // 使用 懶加載 方式進行初始化
}
3.2 call
//call m中的value
type call struct {
wg sync.WaitGroup
//相同key,fn執行的回傳結果
val interface{}
err error
//fn執行期間,相同 key 添加的次數,第一次添加不算
dups int
chans []chan<- Result // DoChan 回傳fn執行的結果
}
3.3 Group.Do
//Do 執行函式的地方,key: 給函式自定義的標識
//fn: 需要執行的函式,fn開始運行后,未運行結果前,這個期間對相同key的呼叫,都會回傳第一次執行fn回傳的結果
//v:fn執行回傳的結果,err:fn執行回傳的err
//shared:fn執行結果是否會共享,fn運行期間,是否有相同的key被呼叫,有則回傳true,反之回傳false
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {//懶加載
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {//fn執行期間,又有相同的key添加進來執行
c.dups++ //fn執行期間,有相同的key添加進來
g.mu.Unlock()
c.wg.Wait() //等待fn執行結果(fn函式里面,會呼叫c.wg.Done)
//-------
//判斷fn執行程序中,是否有 panic 或者 runtime.Goexit()
//感覺主要是為了 DoChan 函式,DoChan 回傳的是channel,防止fn函式執行期間出現問題,導致無法往 chan 里面寫入結果,
//從而導致 外面需要獲取 fn 執行結果的協程一直在等待
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
//---- 以下是key 第一次添加到 m 中時,執行的代碼---
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 執行 fn 的地方
g.doCall(c, key, fn) // 沒有新開一個協程,和DoChan不同,
return c.val, c.err, c.dups > 0
}
3.4 Group.DoChan
//DoChan 和Do 十分類似,只不過回傳的結果通過 chan 來傳遞
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {//懶加載
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {//fn執行期間,又有相同的key添加進來執行
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
//---- 以下是key 第一次添加到 m 中時,執行的代碼---
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn) // 新開啟了一個協程,和Do不同
return ch
}
3.5 Group.doCall
- 雙defer+normalReturn+recovered 判斷fn執行是panic還是runtime.Goexit
//doCall 真正運行fn的地方,需要重點理解
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false //是否正常回傳,默認false
recovered := false //是否recover,默認false
// use double-defer to distinguish panic from runtime.Goexit,
//使用雙 defer 來區分 panic和runtime.Goexit
//是需要結合 normalReturn 和 recovere 的值來進行判斷,從而區分是panic還是runtime.Goexit
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
//既沒有正常回傳,又沒有被 recover,所以是fn執行期間,呼叫了 runtime.Goexit()
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
// 走到這里,fn函式已經執行過了
if g.m[key] == c {
delete(g.m, key) //fn函式執行完畢,好讓后續的key可以繼續進來執行fn函式
}
if e, ok := c.err.(*panicError); ok { // recover住的錯誤
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 { //通過使用DoChan來執行 fn,發生的錯誤
go panic(e) // recover只能夠 recover住同一個協程里的panic,不是同一個協程的無法捕獲,
select {} // 保證協程不退出,錯誤會直接暴露出去
} else { //通過使用 Do來執行fn,發生的錯誤
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
//第一個呼叫的fn函式的協程已經退出,相同key的函式因為 chan 接收不到資料,會發生死鎖()
//fatal error: all goroutines are asleep - deadlock!
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {//fn執行期間,發生了panic
if r := recover(); r != nil {
c.err = newPanicError(r) // 標識為panic錯誤,Do函式中判斷時,好做區分 e, ok := c.err.(*panicError)
}
}
}()
c.val, c.err = fn()
normalReturn = true //fn執行期間,沒有panic
}()
if !normalReturn {
recovered = true //fn執行期間,發生了panic,并且被 recover住了,注意:呼叫runtime.Goexit()時,是無法recover的
}
}
3.6 Group.Forget
//Forget 使用Do執行fn時,可以手動洗掉 g.m 中的key
func (g *Group) Forget(key string){
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
4、執行流程

菜鳥一枚,文中難免有錯誤的地方,如有,懇請大佬指出,
5、參考資料
絕對詳盡的singleflight講解
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/535150.html
標籤:其他
上一篇:python中的for回圈
