最近在作業中碰到了 GC 的問題:專案中大量重復地創建許多物件,造成 GC 的作業量巨大,CPU 頻繁掉底,準備使用 sync.Pool 來快取物件,減輕 GC 的消耗,為了用起來更順暢,我特地研究了一番,形成此文,本文從使用到原始碼決議,循序漸進,一一道來,
本文基于 Go 1.14
目錄
- 是什么
- 有什么用
- 怎么用
- 簡單的例子
- fmt 包如何用
- pool_test
- 其他
- 原始碼分析
- Pool 結構體
- Get
- pin
- popHead
- getSlow
- popTail
- Put
- pushHead
- pack/unpack
- GC
- 總結
- 參考資料
是什么
sync.Pool 是 sync 包下的一個組件,可以作為保存臨時取還物件的一個“池子”,個人覺得它的名字有一定的誤導性,因為 Pool 里裝的物件可以被無通知地被回收,可能 sync.Cache 是一個更合適的名字,
有什么用
對于很多需要重復分配、回收記憶體的地方,sync.Pool 是一個很好的選擇,頻繁地分配、回收記憶體會給 GC 帶來一定的負擔,嚴重的時候會引起 CPU 的毛刺,而 sync.Pool 可以將暫時不用的物件快取起來,待下次需要的時候直接使用,不用再次經過記憶體分配,復用物件的記憶體,減輕 GC 的壓力,提升系統的性能,
怎么用
首先,sync.Pool 是協程安全的,這對于使用者來說是極其方便的,使用前,設定好物件的 New 函式,用于在 Pool 里沒有快取的物件時,創建一個,之后,在程式的任何地方、任何時候僅通過 Get()、Put() 方法就可以取、還物件了,
下面是 2018 年的時候,《Go 夜讀》上關于 sync.Pool 的分享,關于適用場景:
當多個 goroutine 都需要創建同?個物件的時候,如果 goroutine 數過多,導致物件的創建數?劇增,進?導致 GC 壓?增大,形成 “并發?-占?記憶體?-GC 緩慢-處理并發能?降低-并發更?”這樣的惡性回圈,
在這個時候,需要有?個物件池,每個 goroutine 不再??單獨創建物件,?是從物件池中獲取出?個物件(如果池中已經有的話),
因此關鍵思想就是物件的復用,避免重復創建、銷毀,下面我們來看看如何使用,
簡單的例子
首先來看一個簡單的例子:
package main
import (
"fmt"
"sync"
)
var pool *sync.Pool
type Person struct {
Name string
}
func initPool() {
pool = &sync.Pool {
New: func()interface{} {
fmt.Println("Creating a new Person")
return new(Person)
},
}
}
func main() {
initPool()
p := pool.Get().(*Person)
fmt.Println("首次從 pool 里獲取:", p)
p.Name = "first"
fmt.Printf("設定 p.Name = %s\n", p.Name)
pool.Put(p)
fmt.Println("Pool 里已有一個物件:&{first},呼叫 Get: ", pool.Get().(*Person))
fmt.Println("Pool 沒有物件了,呼叫 Get: ", pool.Get().(*Person))
}
運行結果:
Creating a new Person
首次從 pool 里獲取: &{}
設定 p.Name = first
Pool 里已有一個物件:&{first},Get: &{first}
Creating a new Person
Pool 沒有物件了,Get: &{}
首先,需要初始化 Pool,唯一需要的就是設定好 New 函式,當呼叫 Get 方法時,如果池子里快取了物件,就直接回傳快取的物件,如果沒有存貨,則呼叫 New 函式創建一個新的物件,
另外,我們發現 Get 方法取出來的物件和上次 Put 進去的物件實際上是同一個,Pool 沒有做任何“清空”的處理,但我們不應當對此有任何假設,因為在實際的并發使用場景中,無法保證這種順序,最好的做法是在 Put 前,將物件清空,
fmt 包如何用
這部分主要看 fmt.Printf 如何使用:
func Printf(format string, a ...interface{}) (n int, err error) {
return Fprintf(os.Stdout, format, a...)
}
繼續看 Fprintf:
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
p := newPrinter()
p.doPrintf(format, a)
n, err = w.Write(p.buf)
p.free()
return
}
Fprintf 函式的引數是一個 io.Writer,Printf 傳的是 os.Stdout,相當于直接輸出到標準輸出,這里的 newPrinter 用的就是 Pool:
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}
var ppFree = sync.Pool{
New: func() interface{} { return new(pp) },
}
回到 Fprintf 函式,拿到 pp 指標后,會做一些 format 的操作,并且將 p.buf 里面的內容寫入 w,最后,呼叫 free 函式,將 pp 指標歸還到 Pool 中:
// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
if cap(p.buf) > 64<<10 {
return
}
p.buf = p.buf[:0]
p.arg = nil
p.value = https://www.cnblogs.com/qcrao-2018/p/reflect.Value{}
p.wrappedErr = nil
ppFree.Put(p)
}
歸還到 Pool 前將物件的一些欄位清零,這樣,通過 Get 拿到快取的物件時,就可以安全地使用了,
pool_test
通過 test 檔案學習原始碼是一個很好的途徑,因為它代表了“官方”的用法,更重要的是,測驗用例會故意測驗一些“坑”,學習這些坑,也會讓自己在使用的時候就能學會避免,
pool_test 檔案里共有 7 個測驗,4 個 BechMark,
TestPool 和 TestPoolNew 比較簡單,主要是測驗 Get/Put 的功能,我們來看下 TestPoolNew:
func TestPoolNew(t *testing.T) {
// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
i := 0
p := Pool{
New: func() interface{} {
i++
return i
},
}
if v := p.Get(); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(); v != 2 {
t.Fatalf("got %v; want 2", v)
}
// Make sure that the goroutine doesn't migrate to another P
// between Put and Get calls.
Runtime_procPin()
p.Put(42)
if v := p.Get(); v != 42 {
t.Fatalf("got %v; want 42", v)
}
Runtime_procUnpin()
if v := p.Get(); v != 3 {
t.Fatalf("got %v; want 3", v)
}
}
首先設定了 GC=-1,作用就是停止 GC,那為啥要用 defer?函式都跑完了,還要 defer 干啥,注意到,debug.SetGCPercent 這個函式被呼叫了兩次,而且這個函式回傳的是上一次 GC 的值,因此,defer 在這里的用途是還原到呼叫此函式之前的 GC 設定,也就是恢復現場,
接著,調置了 Pool 的 New 函式:直接回傳一個 int,變且每次呼叫 New,都會自增 1,然后,連續呼叫了兩次 Get 函式,因為這個時候 Pool 里沒有快取的物件,因此每次都會呼叫 New 創建一個,所以第一次回傳 1,第二次回傳 2,
然后,呼叫 Runtime_procPin() 防止 goroutine 被強占,目的是保護接下來的一次 Put 和 Get 操作,使得它們操作的物件都是同一個 P 的“池子”,并且,這次呼叫 Get 的時候并沒有呼叫 New,因為之前有一次 Put 的操作,
最后,再次呼叫 Get 操作,因為沒有“存貨”,因此還是會再次呼叫 New 創建一個物件,
TestPoolGC 和 TestPoolRelease 則主要測驗 GC 對 Pool 里物件的影響,這里用了一個函式,用于計數有多少物件會被 GC 回收:
runtime.SetFinalizer(v, func(vv *string) {
atomic.AddUint32(&fin, 1)
})
當垃圾回收檢測到 v 是一個不可達的物件時,并且 v 又有一個關聯的 Finalizer,就會另起一個 goroutine 呼叫設定的 finalizer 函式,也就是上面代碼里的引數 func,這樣,就會讓物件 v 重新可達,從而在這次 GC 程序中不被回收,之后,解綁物件 v 和它所關聯的 Finalizer,當下次 GC 再次檢測到物件 v 不可達時,才會被回收,
TestPoolStress 從名字看,主要是想測一下“壓力”,具體操作就是起了 10 個 goroutine 不斷地向 Pool 里 Put 物件,然后又 Get 物件,看是否會出錯,
TestPoolDequeue 和 TestPoolChain,都呼叫了 testPoolDequeue,這是具體干活的,它需要傳入一個 PoolDequeue 介面:
// poolDequeue testing.
type PoolDequeue interface {
PushHead(val interface{}) bool
PopHead() (interface{}, bool)
PopTail() (interface{}, bool)
}
PoolDequeue 是一個雙端佇列,可以從頭部入隊元素,從頭部和尾部出隊元素,呼叫函式時,前者傳入 NewPoolDequeue(16),后者傳入 NewPoolChain(),底層其實都是 poolDequeue 這個結構體,具體來看 testPoolDequeue 做了什么:

總共起了 10 個 goroutine:1 個生產者,9 個消費者,生產者不斷地從佇列頭 pushHead 元素到雙端佇列里去,并且每 push 10 次,就 popHead 一次;消費者則一直從佇列尾取元素,不論是從佇列頭還是從佇列尾取元素,都會在 map 里做標記,最后檢驗每個元素是不是只被取出過一次,
剩下的就是 Benchmark 測驗了,第一個 BenchmarkPool 比較簡單,就是不停地 Put/Get,測驗性能,
BenchmarkPoolSTW 函式會先關掉 GC,再向 pool 里 put 10 個物件,然后強制觸發 GC,記錄 GC 的停頓時間,并且做一個排序,計算 P50 和 P95 的 STW 時間,這個函式可以加入個人的代碼庫了:
func BenchmarkPoolSTW(b *testing.B) {
// Take control of GC.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
var mstats runtime.MemStats
var pauses []uint64
var p Pool
for i := 0; i < b.N; i++ {
// Put a large number of items into a pool.
const N = 100000
var item interface{} = 42
for i := 0; i < N; i++ {
p.Put(item)
}
// Do a GC.
runtime.GC()
// Record pause time.
runtime.ReadMemStats(&mstats)
pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256])
}
// Get pause time stats.
sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] })
var total uint64
for _, ns := range pauses {
total += ns
}
// ns/op for this benchmark is average STW time.
b.ReportMetric(float64(total)/float64(b.N), "ns/op")
b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW")
b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW")
}
我在 mac 上跑了一下:
go test -v -run=none -bench=BenchmarkPoolSTW
得到輸出:
goos: darwin
goarch: amd64
pkg: sync
BenchmarkPoolSTW-12 361 3708 ns/op 3583 p50-ns/STW 5008 p95-ns/STW
PASS
ok sync 1.481s
最后一個 BenchmarkPoolExpensiveNew 測驗當 New 的代價很高時,Pool 的表現,也可以加入個人的代碼庫,
其他
標準庫中 encoding/json 也用到了 sync.Pool 來提升性能,著名的 gin 框架,對 context 取用也到了 sync.Pool,
來看下 gin 如何使用 sync.Pool,設定 New 函式:
engine.pool.New = func() interface{} {
return engine.allocateContext()
}
func (engine *Engine) allocateContext() *Context {
return &Context{engine: engine, KeysMutex: &sync.RWMutex{}}
}
使用:
// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
c.Request = req
c.reset()
engine.handleHTTPRequest(c)
engine.pool.Put(c)
}
先呼叫 Get 取出來快取的物件,然后會做一些 reset 操作,再執行 handleHTTPRequest,最后再 Put 回 Pool,
另外,Echo 框架也使?了 sync.Pool 來管理 context,并且?乎達到了零堆記憶體分配:
It leverages sync pool to reuse memory and achieve zero dynamic memory allocation with no GC overhead.
原始碼分析
Pool 結構體
首先來看 Pool 的結構體:
type Pool struct {
noCopy noCopy
// 每個 P 的本地佇列,實際型別為 [P]poolLocal
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// [P]poolLocal的大小
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 自定義的物件創建回呼函式,當 pool 中無可用物件時會呼叫此函式
New func() interface{}
}
因為 Pool 不希望被復制,所以結構體里有一個 noCopy 的欄位,使用 go vet 工具可以檢測到用戶代碼是否復制了 Pool,
noCopy是 go1.7 開始引入的一個靜態檢查機制,它不僅僅作業在運行時或標準庫,同時也對用戶代碼有效,
用戶只需實作這樣的不消耗記憶體、僅用于靜態分析的結構,來保證一個物件在第一次使用后不會發生復制,
實作非常簡單:
// noCopy 用于嵌入一個結構體中來保證其第一次使用后不會被復制
//
// 見 https://golang.org/issues/8005#issuecomment-190753527
type noCopy struct{}
// Lock 是一個空操作用來給 `go ve` 的 -copylocks 靜態分析
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
local 欄位存盤指向 [P]poolLocal 陣列(嚴格來說,它是一個切片)的指標,localSize 則表示 local 陣列的大小,訪問時,P 的 id 對應 [P]poolLocal 下標索引,通過這樣的設計,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了性能,
在一輪 GC 到來時,victim 和 victimSize 會分別“接管” local 和 localSize,victim 的機制用于減少 GC 后冷啟動導致的性能抖動,讓分配物件更平滑,
Victim Cache 本來是計算機架構里面的一個概念,是 CPU 硬體處理快取的一種技術,
sync.Pool引入的意圖在于降低 GC 壓力的同時提高命中率,
當 Pool 沒有快取的物件時,呼叫 New 方法生成一個新的物件,
type poolLocal struct {
poolLocalInternal
// 將 poolLocal 補齊至兩個快取行的倍數,防止 false sharing,
// 每個快取行具有 64 bytes,即 512 bit
// 目前我們的處理器一般擁有 32 * 1024 / 64 = 512 條快取行
// 偽共享,僅占位用,防止在 cache line 上分配多個 poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
// P 的私有快取區,使用時無需要加鎖
private interface{}
// 公共快取區,本地 P 可以 pushHead/popHead;其他 P 則只能 popTail
shared poolChain
}
欄位 pad 主要是防止 false sharing,董大的《什么是 cpu cache》里講得比較好:
現代 cpu 中,cache 都劃分成以 cache line (cache block) 為單位,在 x86_64 體系下一般都是 64 位元組,cache line 是操作的最小單元,
程式即使只想讀記憶體中的 1 個位元組資料,也要同時把附近 63 節字加載到 cache 中,如果讀取超個 64 位元組,那么就要加載到多個 cache line 中,
簡單來說,如果沒有 pad 欄位,那么當需要訪問 0 號索引的 poolLocal 時,CPU 同時會把 0 號和 1 號索引同時加載到 cpu cache,在只修改 0 號索引的情況下,會讓 1 號索引的 poolLocal 失效,這樣,當其他執行緒想要讀取 1 號索引時,發生 cache miss,還得重新再加載,對性能有損,增加一個 pad,補齊快取行,讓相關的欄位能獨立地加載到快取行就不會出現 false sharding 了,
poolChain 是一個雙端佇列的實作:
type poolChain struct {
// 只有生產者會 push to,不用加鎖
head *poolChainElt
// 讀寫需要原子控制, pop from
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next 被 producer 寫,consumer 讀,所以只會從 nil 變成 non-nil
// prev 被 consumer 寫,producer 讀,所以只會從 non-nil 變成 nil
next, prev *poolChainElt
}
type poolDequeue struct {
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
// headTail 包含一個 32 位的 head 和一個 32 位的 tail 指標,這兩個值都和 len(vals)-1 取模過,
// tail 是佇列中最老的資料,head 指向下一個將要填充的 slot
// slots 的有效范圍是 [tail, head),由 consumers 持有,
headTail uint64
// vals 是一個存盤 interface{} 的環形佇列,它的 size 必須是 2 的冪
// 如果 slot 為空,則 vals[i].typ 為空;否則,非空,
// 一個 slot 在這時宣告無效:tail 不指向它了,vals[i].typ 為 nil
// 由 consumer 設定成 nil,由 producer 讀
vals []eface
}
poolDequeue被實作為單生產者、多消費者的固定大小的無鎖(atomic 實作) Ring 式佇列(底層存盤使用陣列,使用兩個指標標記 head、tail),生產者可以從 head 插入、head 洗掉,而消費者僅可從 tail 洗掉,
headTail指向佇列的頭和尾,通過位運算將 head 和 tail 存入 headTail 變數中,
我們用一幅圖來完整地描述 Pool 結構體:

結合木白的技術私廚的《請問sync.Pool有什么缺點?》里的一張圖,對于雙端佇列的理解會更容易一些:

我們看到 Pool 并沒有直接使用 poolDequeue,原因是它的大小是固定的,而 Pool 的大小是沒有限制的,因此,在 poolDequeue 之上包裝了一下,變成了一個 poolChainElt 的雙向鏈表,可以動態增長,
Get
直接上原始碼:
func (p *Pool) Get() interface{} {
// ......
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
// ......
if x == nil && p.New != nil {
x = p.New()
}
return x
}
省略號的內容是 race 相關的,屬于閱讀原始碼程序中的一些噪音,暫時注釋掉,這樣,Get 的整個程序就非常清晰了:
-
首先,呼叫
p.pin()函式將當前的 goroutine 和 P 系結,禁止被搶占,回傳當前 P 對應的 poolLocal,以及 pid, -
然后直接取 l.private,賦值給 x,并置 l.private 為 nil,
-
判斷 x 是否為空,若為空,則嘗試從 l.shared 的頭部 pop 一個物件出來,同時賦值給 x,
-
如果 x 仍然為空,則呼叫 getSlow 嘗試從其他 P 的 shared 雙端佇列尾部“偷”一個物件出來,
-
Pool 的相關操作做完了,呼叫
runtime_procUnpin()解除非搶占, -
最后如果還是沒有取到快取的物件,那就直接呼叫預先設定好的 New 函式,創建一個出來,
我用一張流程圖來展示整個程序:

整體流程梳理完了,我們再來看一下其中的一些關鍵函式,
pin
先來看 Pool.pin():
// src/sync/pool.go
// 呼叫方必須在完成取值后呼叫 runtime_procUnpin() 來取消搶占,
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// 因為可能存在動態的 P(運行時調整 P 的個數)
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pin 的作用就是將當前 groutine 和 P 系結在一起,禁止搶占,并且回傳對應的 poolLocal 以及 P 的 id,
如果 G 被搶占,則 G 的狀態從 running 變成 runnable,會被放回 P 的 localq 或 globaq,等待下一次調度,下次再執行時,就不一定是和現在的 P 相結合了,因為之后會用到 pid,如果被搶占了,有可能接下來使用的 pid 與所系結的 P 并非同一個,
“系結”的任務最終交給了 procPin:
// src/runtime/proc.go
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
實作的代碼很簡潔:將當前 goroutine 系結的 m 上的一個鎖欄位 locks 值加 1,即完成了“系結”,關于 pin 的原理,可以參考《golang的物件池sync.pool原始碼解讀》,文章詳細分析了為什么執行 procPin 之后,不可搶占,且 GC 不會清掃 Pool 里的物件,
我們再回到 p.pin(),原子操作取出 p.localSize 和 p.local,如果當前 pid 小于 p.localSize,則直接取 poolLocal 陣列中的 pid 索引處的元素,否則,說明 Pool 還沒有創建 poolLocal,呼叫 p.pinSlow() 完成創建作業,
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
// 沒有使用原子操作,因為已經加了全域鎖了
s := p.localSize
l := p.local
// 因為 pinSlow 中途可能已經被其他的執行緒呼叫,因此這時候需要再次對 pid 進行檢查, 如果 pid 在 p.local 大小范圍內,則不用創建 poolLocal 切片,直接回傳,
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
// 當前 P 的數量
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 舊的 local 會被回收
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
因為要上一把大鎖 allPoolsMu,所以函式名帶有 slow,我們知道,鎖粒度越大,競爭越多,自然就越“slow”,不過要想上鎖的話,得先解除“系結”,鎖上之后,再執行“系結”,原因是鎖越大,被阻塞的概率就越大,如果還占著 P,那就浪費資源,
在解除系結后,pinSlow 可能被其他的執行緒呼叫過了,p.local 可能會發生變化,因此這時候需要再次對 pid 進行檢查,如果 pid 在 p.localSize 大小范圍內,則不用再創建 poolLocal 切片,直接回傳,
之后,根據 P 的個數,使用 make 創建切片,包含 runtime.GOMAXPROCS(0) 個 poolLocal,并且使用原子操作設定 p.local 和 p.localSize,
最后,回傳 p.local 對應 pid 索引處的元素,
關于這把大鎖 allPoolsMu,曹大在《幾個 Go 系統可能遇到的鎖問題》里講了一個例子,第三方庫用了 sync.Pool,內部有一個結構體 fasttemplate.Template,包含 sync.Pool 欄位,而 rd 在使用時,每個請求都會新建這樣一個結構體,于是,處理每個請求時,都會嘗試從一個空的 Pool 里取快取的物件,最后 goroutine 都阻塞在了這把大鎖上,因為都在嘗試執行:allPools = append(allPools, p),從而造成性能問題,
popHead
回到 Get 函式,再來看另一個關鍵的函式:poolChain.popHead():
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
popHead 函式只會被 producer 呼叫,首先拿到頭節點:c.head,如果頭節點不為空的話,嘗試呼叫頭節點的 popHead 方法,注意這兩個 popHead 方法實際上并不相同,一個是 poolChain 的,一個是 poolDequeue 的,有疑惑的,不妨回頭再看一下 Pool 結構體的圖,我們來看 poolDequeue.popHead():
// /usr/local/go/src/sync/poolqueue.go
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判斷佇列是否為空
if tail == head {
// Queue is empty.
return nil, false
}
// head 位置是隊頭的前一個位置,所以此處要先退一位,
// 在讀出 slot 的 value 之前就把 head 值減 1,取消對這個 slot 的控制
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 取出 val
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置 slot,typ 和 val 均為 nil
// 這里清空的方式與 popTail 不同,與 pushHead 沒有競爭關系,所以不用太小心
*slot = eface{}
return val, true
}
此函式會刪掉并且回傳 queue 的頭節點,但如果 queue 為空的話,回傳 false,這里的 queue 存盤的實際上就是 Pool 里快取的物件,
整個函式的核心是一個無限回圈,這是 Go 中常用的無鎖化編程形式,
首先呼叫 unpack 函式分離出 head 和 tail 指標,如果 head 和 tail 相等,即首尾相等,那么這個佇列就是空的,直接就回傳 nil,false,
否則,將 head 指標后移一位,即 head 值減 1,然后呼叫 pack 打包 head 和 tail 指標,使用 atomic.CompareAndSwapUint64 比較 headTail 在這之間是否有變化,如果沒變化,相當于獲取到了這把鎖,那就更新 headTail 的值,并且把 vals 相應索引處的元素賦值給 slot,
因為 vals 長度實際是只能是 2 的 n 次冪,因此 len(d.vals)-1 實際上得到的值的低 n 位是全 1,它再與 head 相與,實際就是取 head 低 n 位的值,
得到相應 slot 的元素后,經過型別轉換并判斷是否是 dequeueNil,如果是,說明沒取到快取的物件,回傳 nil,
// /usr/local/go/src/sync/poolqueue.go
// 因為使用 nil 代表空的 slots,因此使用 dequeueNil 表示 interface{}(nil)
type dequeueNil *struct{}
最后,回傳 val 之前,將 slot “歸零”:*slot = eface{},
回到 poolChain.popHead(),呼叫 poolDequeue.popHead() 拿到快取的物件后,直接回傳,否則,將 d 重新指向 d.prev,繼續嘗試獲取快取的物件,
getSlow
如果在 shared 里沒有獲取到快取物件,則繼續呼叫 Pool.getSlow(),嘗試從其他 P 的 poolLocal 偷取:
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
// 從其他 P 中竊取物件
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 嘗試從victim cache中取物件,這發生在嘗試從其他 P 的 poolLocal 偷去失敗后,
// 因為這樣可以使 victim 中的物件更容易被回收,
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 清空 victim cache,下次就不用再從這里找了
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
從索引為 pid+1 的 poolLocal 處開始,嘗試呼叫 shared.popTail() 獲取快取物件,如果沒有拿到,則從 victim 里找,和 poolLocal 的邏輯類似,
最后,實在沒找到,就把 victimSize 置 0,防止后來的“人”再到 victim 里找,
在 Get 函式的最后,經過這一番操作還是沒找到快取的物件,就呼叫 New 函式創建一個新的物件,
popTail
最后,還剩一個 popTail 函式:
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 雙向鏈表只有一個尾節點,現在為空
return nil, false
}
// 雙向鏈表的尾節點里的雙端佇列被“掏空”,所以繼續看下一個節點,
// 并且由于尾節點已經被“掏空”,所以要甩掉它,這樣,下次 popHead 就不會再看它有沒有快取物件了,
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 甩掉尾節點
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
在 for 回圈的一開始,就把 d.next 加載到了 d2,因為 d 可能會短暫為空,但如果 d2 在 pop 或者 pop fails 之前就不為空的話,說明 d 就會永久為空了,在這種情況下,可以安全地將 d 這個結點“甩掉”,
最后,將 c.tail 更新為 d2,可以防止下次 popTail 的時候查看一個空的 dequeue;而將 d2.prev 設定為 nil,可以防止下次 popHead 時查看一個空的 dequeue,
我們再看一下核心的 poolDequeue.popTail:
// src/sync/poolqueue.go:147
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判斷佇列是否空
if tail == head {
// Queue is empty.
return nil, false
}
// 先搞定 head 和 tail 指標位置,如果搞定,那么這個 slot 就歸屬我們了
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
}
popTail 從佇列尾部移除一個元素,如果佇列為空,回傳 false,此函式可能同時被多個消費者呼叫,
函式的核心是一個無限回圈,又是一個無鎖編程,先解出 head,tail 指標值,如果兩者相等,說明佇列為空,
因為要從尾部移除一個元素,所以 tail 指標前進 1,然后使用原子操作設定 headTail,
最后,將要移除的 slot 的 val 和 typ “歸零”:
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
Put
// src/sync/pool.go
// Put 將物件添加到 Pool
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// ……
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
//……
}
同樣刪掉了 race 相關的函式,看起來清爽多了,整個 Put 的邏輯也很清晰:
-
先系結 g 和 P,然后嘗試將 x 賦值給 private 欄位,
-
如果失敗,就呼叫
pushHead方法嘗試將其放入 shared 欄位所維護的雙端佇列中,
同樣用流程圖來展示整個程序:

pushHead
我們來看 pushHead 的原始碼,比較清晰:
// src/sync/poolqueue.go
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// poolDequeue 初始長度為8
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// 前一個 poolDequeue 長度的 2 倍
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
// 首尾相連,構成鏈表
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
如果 c.head 為空,就要創建一個 poolChainElt,作為首結點,當然也是尾節點,它管理的雙端佇列的長度,初始為 8,放滿之后,再創建一個 poolChainElt 節點時,雙端佇列的長度就要翻倍,當然,有一個最大長度限制(2^30):
const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4
呼叫 poolDequeue.pushHead 嘗試將物件放到 poolDeque 里去:
// src/sync/poolqueue.go
// 將 val 添加到雙端佇列頭部,如果佇列已滿,則回傳 false,此函式只能被一個生產者呼叫
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 佇列滿了
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 檢測這個 slot 是否被 popTail 釋放
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 另一個 groutine 正在 popTail 這個 slot,說明佇列仍然是滿的
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
// slot占位,將val存入vals中
*(*interface{})(unsafe.Pointer(slot)) = val
// head 增加 1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
首先判斷佇列是否已滿:
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
也就是將尾部指標加上 d.vals 的長度,再取低 31 位,看它是否和 head 相等,我們知道,d.vals 的長度實際上是固定的,因此如果佇列已滿,那么 if 陳述句的兩邊就是相等的,如果佇列滿了,直接回傳 false,
否則,佇列沒滿,通過 head 指標找到即將填充的 slot 位置:取 head 指標的低 31 位,
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
// popTail 是先設定 val,再將 typ 設定為 nil,設定完 typ 之后,popHead 才可以操作這個 slot
return false
}
上面這一段用來判斷是否和 popTail 有沖突發生,如果有,則直接回傳 false,
最后,將 val 賦值到 slot,并將 head 指標值加 1,
// slot占位,將val存入vals中
*(*interface{})(unsafe.Pointer(slot)) = val
這里的實作比較巧妙,slot 是 eface 型別,將 slot 轉為 interface{} 型別,這樣 val 能以 interface{} 賦值給 slot 讓 slot.typ 和 slot.val 指向其記憶體塊,于是 slot.typ 和 slot.val 均不為空,
pack/unpack
最后我們再來看一下 pack 和 unpack 函式,它們實際上是一組系結、解綁 head 和 tail 指標的兩個函式,
// src/sync/poolqueue.go
const dequeueBits = 32
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
mask 的低 31 位為全 1,其他位為 0,它和 tail 相與,就是只看 tail 的低 31 位,而 head 向左移 32 位之后,低 32 位為全 0,最后把兩部分“或”起來,head 和 tail 就“系結”在一起了,
相應的解綁函式:
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
取出 head 指標的方法就是將 ptrs 右移 32 位,再與 mask 相與,同樣只看 head 的低 31 位,而 tail 實際上更簡單,直接將 ptrs 與 mask 相與就可以了,
GC
對于 Pool 而言,并不能無限擴展,否則物件占用記憶體太多了,會引起記憶體溢位,
幾乎所有的池技術中,都會在某個時刻清慷訓清除部分快取物件,那么在 Go 中何時清理未使用的物件呢?
答案是 GC 發生時,
在 pool.go 檔案的 init 函式里,注冊了 GC 發生時,如何清理 Pool 的函式:
// src/sync/pool.go
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
編譯器在背后做了一些動作:
// src/runtime/mgc.go
// Hooks for other packages
var poolcleanup func()
// 利用編譯器標志將 sync 包中的清理注冊到運行時
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
具體來看下:
func poolCleanup() {
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
poolCleanup 會在 STW 階段被呼叫,整體看起來,比較簡潔,主要是將 local 和 victim 作交換,這樣也就不致于讓 GC 把所有的 Pool 都清空了,有 victim 在“兜底”,
如果
sync.Pool的獲取、釋放速度穩定,那么就不會有新的池物件進行分配,如果獲取的速度下降了,那么物件可能會在兩個GC周期內被釋放,而不是以前的一個GC周期,
鳥窩的【Go 1.13中 sync.Pool 是如何優化的?】講了 1.13 中的優化,
參考資料【理解 Go 1.13 中 sync.Pool 的設計與實作】 手動模擬了一下呼叫 poolCleanup 函式前后 oldPools,allPools,p.vitcim 的變化程序,很精彩:
- 初始狀態下,oldPools 和 allPools 均為 nil,
- 第 1 次呼叫 Get,由于 p.local 為 nil,將會在 pinSlow 中創建 p.local,然后將 p 放入 allPools,此時 allPools 長度為 1,oldPools 為 nil,
- 物件使用完畢,第 1 次呼叫 Put 放回物件,
- 第 1 次GC STW 階段,allPools 中所有 p.local 將值賦值給 victim 并置為 nil,allPools 賦值給 oldPools,最后 allPools 為 nil,oldPools 長度為 1,
- 第 2 次呼叫 Get,由于 p.local 為 nil,此時會從 p.victim 里面嘗試取物件,
- 物件使用完畢,第 2 次呼叫 Put 放回物件,但由于 p.local 為 nil,重新創建 p.local,并將物件放回,此時 allPools 長度為 1,oldPools 長度為 1,
- 第 2 次 GC STW 階段,oldPools 中所有 p.victim 置 nil,前一次的 cache 在本次 GC 時被回收,allPools 所有 p.local 將值賦值給 victim 并置為nil,最后 allPools 為 nil,oldPools 長度為 1,
我根據這個流程畫了一張圖,可以理解地更清晰一些:

需要指出的是,allPools 和 oldPools 都是切片,切片的元素是指向 Pool 的指標,Get/Put 操作不需要通過它們,在第 6 步,如果還有其他 Pool 執行了 Put 操作,allPools 這時就會有多個元素,
在 Go 1.13 之前的實作中,poolCleanup 比較“簡單粗暴”:
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
直接清空了所有 Pool 的 p.local 和 poolLocal.shared,
通過兩者的對比發現,新版的實作相比 Go 1.13 之前,GC 的粒度拉大了,由于實際回收的時間線拉長,單位時間內 GC 的開銷減小,
由此基本明白 p.victim 的作用,它的定位是次級快取,GC 時將物件放入其中,下一次 GC 來臨之前如果有 Get 呼叫則會從 p.victim 中取,直到再一次 GC 來臨時回收,
同時由于從 p.victim 中取出物件使用完畢之后并未放回 p.victim 中,在一定程度也減小了下一次 GC 的開銷,原來 1 次 GC 的開銷被拉長到 2 次且會有一定程度的開銷減小,這就是 p.victim 引入的意圖,
【理解 Go 1.13 中 sync.Pool 的設計與實作】 這篇文章最后還總結了 sync.Pool 的設計理念,包括:無鎖、操作物件隔離、原子操作代替鎖、行為隔離——鏈表、Victim Cache 降低 GC 開銷,寫得非常不錯,推薦閱讀,
另外,關于 sync.Pool 中鎖競爭優化的文章,推薦閱讀芮大神的【優化鎖競爭】,
總結
本文先是介紹了 Pool 是什么,有什么作用,接著給出了 Pool 的用法以及在標準庫、一些第三方庫中的用法,還介紹了 pool_test 中的一些測驗用例,最后,詳細解讀了 sync.Pool 的原始碼,
本文的結尾部分,再來詳細地總結一下關于 sync.Pool 的要點:
-
關鍵思想是物件的復用,避免重復創建、銷毀,將暫時不用的物件快取起來,待下次需要的時候直接使用,不用再次經過記憶體分配,復用物件的記憶體,減輕 GC 的壓力,
-
sync.Pool是協程安全的,使用起來非常方便,設定好 New 函式后,呼叫 Get 獲取,呼叫 Put 歸還物件, -
Go 語言內置的 fmt 包,encoding/json 包都可以看到 sync.Pool 的身影;
gin,Echo等框架也都使用了 sync.Pool, -
不要對 Get 得到的物件有任何假設,更好的做法是歸還物件時,將物件“清空”,
-
Pool 里物件的生命周期受 GC 影響,不適合于做連接池,因為連接池需要自己管理物件的生命周期,
-
Pool 不可以指定??,??只受制于 GC 臨界值,
-
procPin將 G 和 P 系結,防止 G 被搶占,在系結期間,GC 無法清理快取的物件, -
在加入
victim機制前,sync.Pool 里物件的最?快取時間是一個 GC 周期,當 GC 開始時,沒有被引?的物件都會被清理掉;加入victim機制后,最大快取時間為兩個 GC 周期, -
Victim Cache 本來是計算機架構里面的一個概念,是 CPU 硬體處理快取的一種技術,
sync.Pool引入的意圖在于降低 GC 壓力的同時提高命中率, -
sync.Pool的最底層使用切片加鏈表來實作雙端佇列,并將快取的物件存盤在切片中,
參考資料
【歐神 原始碼分析】https://changkun.us/archives/2018/09/256/
【Go 夜讀】https://reading.hidevops.io/reading/20180817/2018-08-17-sync-pool-reading.pdf
【夜讀第 14 期視頻】https://www.youtube.com/watch?v=jaepwn2PWPk&list=PLe5svQwVF1L5bNxB0smO8gNfAZQYWdIpI
【原始碼分析,偽共享】https://juejin.im/post/5d4087276fb9a06adb7fbe4a
【golang的物件池sync.pool原始碼解讀】https://zhuanlan.zhihu.com/p/99710992
【理解 Go 1.13 中 sync.Pool 的設計與實作】https://zhuanlan.zhihu.com/p/110140126
【優缺點,圖】http://cbsheng.github.io/posts/golang標準庫sync.pool原理及原始碼簡析/
【xiaorui 優化鎖競爭】http://xiaorui.cc/archives/5878
【性能優化之路,自定義多種規格的快取】https://blog.cyeam.com/golang/2017/02/08/go-optimize-slice-pool
【sync.Pool 有什么缺點】https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41#wechat_redirect
【1.12 和 1.13 的演變】https://github.com/watermelo/dailyTrans/blob/master/golang/sync_pool_understand.md
【董澤潤 演進】https://www.jianshu.com/p/2e08332481c5
【noCopy】https://github.com/golang/go/issues/8005#issuecomment-190753527
【董澤潤 cpu cache】https://www.jianshu.com/p/dc4b5562aad2
【gomemcache 例子】https://docs.kilvn.com/The-Golang-Standard-Library-by-Example/chapter16/16.01.html
【鳥窩 1.13 優化】https://colobu.com/2019/10/08/how-is-sync-Pool-improved-in-Go-1-13/
【A journey with go】https://medium.com/a-journey-with-go/go-understand-the-design-of-sync-pool-2dde3024e277
【封裝了一個計陣列件】https://www.akshaydeo.com/blog/2017/12/23/How-did-I-improve-latency-by-700-percent-using-syncPool/
【偽共享】http://ifeve.com/falsesharing/
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/24412.html
標籤:Go
上一篇:吾八哥學k8s(三):kubernetes里創建資源的方法
下一篇:golang實作常用集合原理介紹
