主頁 > 後端開發 > golang 中 sync.Mutex 的實作

golang 中 sync.Mutex 的實作

2022-04-06 07:01:18 後端開發

mutex 的實作思想

mutex 主要有兩個 method: Lock()Unlock()

Lock() 可以通過一個 CAS 操作來實作

func (m *Mutex) Lock() {
	for !atomic.CompareAndSwapUint32(&m.locked, 0, 1) {
	}
}

func (m *Mutex) Unlock() {
	atomic.StoreUint32(&m.locked, 0)
}

Lock() 一直進行 CAS 操作,比較耗 CPU,因此帶來了一個優化:如果協程在一段時間內搶不到鎖,可以把該協程掛到一個等待佇列上,Unlock() 的一方除了更新鎖的狀態,還需要從等待佇列中喚醒一個協程,

但是這個優化會存在一個問題,如果一個協程從等待佇列中喚醒后再次搶鎖時,鎖已經被一個新來的協程搶走了,它就只能再次被掛到等待佇列中,接著再被喚醒,但又可能搶鎖失敗...... 這個悲催的協程可能會一直搶不到鎖,由此產生饑餓 (starvation) 現象,

饑餓現象會導致尾部延遲 (Tail Latency) 特別高,什么是尾部延遲?用一句話來說就是:最慢的特別慢!

如果共有 1000 個協程,假設 999 個協程可以在 1ms 內搶到鎖,雖然平均時間才 2ms,但是最慢的那個協程卻需要 1s 才搶到鎖,這就是尾部延遲,

golang 中 mutex 的實作思想

?  go version
go version go1.16.5 darwin/arm64

本次閱讀的 go 原始碼版本為 go1.16.5,

golang 標準庫里的 mutex 避免了饑餓現象的發生,先大致介紹一下 golang 的加鎖和解鎖流程,對后面的原始碼閱讀有幫助,

鎖有兩種 mode,分別是 normal mode 和 starvation mode,初始為 normal mode,當一個協程來搶鎖時,依舊是做 CAS 操作,如果成功了,就直接回傳,如果沒有搶到鎖,它會做一定次數的自旋操作,等待鎖被釋放,在自旋操作結束后,如果鎖依舊沒有被釋放,那么這個協程就會被放到等待佇列中,如果一個處于等待佇列中的協程一直都沒有搶到鎖,mutex 就會從 normal mode 變成 starvation mode,在 starvation mode 下,當有協程釋放鎖時,這個鎖會被直接交給等待佇列中的協程,從而避免產生饑餓執行緒,

除此之外,golang 還有一點小優化,當有協程正在自旋搶鎖時,Unlock() 的一方不會從等待佇列中喚醒協程,因為即使喚醒了,被喚醒的協程也搶不過正在自旋的協程,

下面正式開始閱讀原始碼,

mutex 的結構以及一些 const 常量值

type Mutex struct {
	state int32
	sema  uint32
}
const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken				
	mutexStarving
	mutexWaiterShift = iota // 3

	// Mutex fairness.
	//
	// Mutex can be in 2 modes of operations: normal and starvation.
	// In normal mode waiters are queued in FIFO order, but a woken up waiter
	// does not own the mutex and competes with new arriving goroutines over
	// the ownership. New arriving goroutines have an advantage -- they are
	// already running on CPU and there can be lots of them, so a woken up
	// waiter has good chances of losing. In such case it is queued at front
	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
	// it switches mutex to the starvation mode.
	//
	// In starvation mode ownership of the mutex is directly handed off from
	// the unlocking goroutine to the waiter at the front of the queue.
	// New arriving goroutines don't try to acquire the mutex even if it appears
	// to be unlocked, and don't try to spin. Instead they queue themselves at
	// the tail of the wait queue.
	//
	// If a waiter receives ownership of the mutex and sees that either
	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
	// it switches mutex back to normal operation mode.
	//
	// Normal mode has considerably better performance as a goroutine can acquire
	// a mutex several times in a row even if there are blocked waiters.
	// Starvation mode is important to prevent pathological cases of tail latency.
	starvationThresholdNs = 1e6
)

mutex 的狀態是通過 state 來維護的,state 有 32 個 bit,

前面 29 個 bit 用來記錄當前等待佇列中有多少個協程在等待,將等待佇列的協程數量記錄為 waiterCount,

state >> mutexWaiterShift // mutexWaiterShift 的值為 3

第 30 個 bit 表示當前 mutex 是否處于 starvation mode,將這個 bit 記為 starvationFlag,

state & mutexStarving

第 31 個 bit 表示當前是否有協程正在 (第一次) 自旋,將這個 bit 記為 wokenFlag,woken 的意思也就是醒著,代表它不在等待佇列上睡眠,

state & mutexWoken

第 32 個 bit 表示當前鎖是否被鎖了 (感覺有點繞口哈哈) ,將這個 bit 記為 lockFlag,

state & mutexLocked

用一個圖來表示這些 bit

0 0 0 0 0 0 0 0 ... 0 0				0			0
                      |				|			|
waiterCount     starvationFlag  wokenFlag   lockFlag

sema 是一個信號量,它會被用來關聯一個等待佇列,

分別討論幾種 case 下,代碼的執行情況,

Mutex 沒有被鎖住,第一個協程來拿鎖

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// ...
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

在 Mutex 沒有被鎖住時,state 的值為 0,此時第一個協程來拿鎖時,由于 state 的值為 0,因此 CAS 操作會成功,CAS 操作之后的 state 的值變成 1 (lockFlag = 1) ,然后 return 掉,不會進入到 m.lockSlow() 里面,

Mutex 僅被協程 A 鎖住,沒有其他協程搶鎖,協程 A 釋放鎖

func (m *Mutex) Unlock() {
	// ...

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

緊接上面,state 的值為 1,AddInt32(m.state,-1) 之后,state 的值變成了 0 (lockFlag = 0) ,new 的值為 0,然后就回傳了,

Mutex 已經被協程 A 鎖住,協程 B 來拿鎖

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// ...
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

因為 state 的值不為 0,CompareAndSwapInt32 會回傳 false,所以會進入到 lockSlow() 里面

lockSlow()

首先看一下 lockSlow() 這個方法的全貌

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexwokenFlag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}
第一步: doSpin (空轉)

進入 for 回圈后,會執行一個判斷

for {
    // Don't spin in starvation mode, ownership is handed off to waiters
    // so we won't be able to acquire the mutex anyway.
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // Active spinning makes sense.
        // Try to set mutexwokenFlag to inform Unlock
        // to not wake other blocked goroutines.
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    // ...
}

runtime_canSpin(iter) 的作用是根據 iter 的值判斷自否應該自旋下去, (這個方法的實作可以在后面看到)

最初的幾次判斷,由于 iter 的值為 0,runtime_canSpin(iter) 會回傳 true,因此

if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter)

這個判斷會一直通過,由于 old>>mutexWaiterShift = 0 (waiterCount = 0) ,不滿足第二個判斷的條件,因此不會執行 CAS 操作和 awoke = true

接著就是執行 runtime_doSpin() 了,runtime_doSpin() 會進行一些慷訓圈,消耗了一下 CPU 時間,然后就通過 continue 進入到下一次回圈了, (runtime_doSpin具體實作也可以在后面看到)

看到看到,這段代碼不是用來搶鎖的,而是用來等鎖變成 unlock 狀態的,它會空轉一定的次數,期待在空轉的程序中,鎖被其他的協程釋放,

runtime_doSpin()
// src/runtime/lock_sema.go
const active_spin_cnt = 30
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
# /src/runtime/asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

procyield() 會回圈執行 PAUSE 指令,

runtime_canSpin()

runtime_canSpin() 的實作在 src/runtime/proc.go 里面,里面的判斷比較多,但是我們只需要關注 i >= active_spin 這一個判斷就行,

const active_spin     = 4
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// sync.Mutex is cooperative, so we are conservative with spinning.
	// Spin only few times and only if running on a multicore machine and
	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
	// As opposed to runtime mutex we don't do passive spinning here,
	// because there can be work on global runq or on other Ps.
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

一個小插曲

在利用斷點來 debug 時,發現沒辦法 watch sync_runtime_canSpin() 內參考的一些全域變數,例如 active_spin,ncpu,sched.npidle 這些,所以我就大力出奇跡,強行修改原始碼在里面宣告了幾個區域變數,這下可以通過 watch 區域變數來得知全域變數的值了 (機智如我哈哈) ,

func sync_runtime_canSpin(i int) bool {
	local_active_spin := active_spin
	local_ncpu := ncpu
	local_gomaxprocs := gomaxprocs
	npidle := sched.npidle
	nmspinning := sched.nmspinning
	if i >= local_active_spin || local_ncpu <= 1 ||local_gomaxprocs <= int32(npidle+nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}
第二步: 根據舊狀態來計算新狀態
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
    new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}
if awoke {
    // The goroutine has been woken from sleep,
    // so we need to reset the flag in either case.
    // ...
    new &^= mutexWoken
}

這一段代碼,是根據 old state 來計算 new state,有 4 個操作

  • set lockFlag: new |= mutexLocked
  • 增加 waiterCount: new += 1 << mutexWaiterShift
  • set starvationFlag: new |= mutexStarving
  • clear wokenFlag: new &^= mutexWoken

由于在這里我們只討論 ”Mutex 已經被協程 A 鎖住,協程 B 來拿鎖“ 這種情況,可以分為兩種 case

  • case1: 在第一步自旋的程序中,鎖已經被釋放了,此時 old state = 000000...000 (所有 bit 都為 0) ,經過這四個操作的洗禮后,lockFlag 被設定成了 1,
  • case2: 在第一步自旋結束后,鎖還沒有被釋放,即 old state 此時為 00000000...001 (僅 lockFlag 為 1),經過這四個操作的洗禮后,waiterCounter = 1,lockFlag 也為 1,
第三步: 更新 state (搶鎖)
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // ...
} else {
    old = m.state
}

這一步會通過 CAS 操作將 mutex.state 更新為我們剛剛計算得到的 new state,如果 CAS 成功,且 old 處于未上鎖的狀態時,就直接利用 break 退出回圈回傳了 (也就是上面的 case1) ,如果 CAS 失敗,將會更新 old state 的值,進行下一次回圈,再重復一二三步;

如果是 case2 的話,情況會稍微復雜一點

if atomic.CompareAndSwapInt32(&m.state, old, new) {
	// ...
    // If we were already waiting before, queue at the front of the queue.
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    // ...
}

主要就是通過 runtime_SemacquireMutex() 把自己放進了等待佇列里面,之后 runtime 不會再調度該協程,直到協程被喚醒,

關于 runtime_SemacquireMutex() 的實作,我暫時就不追究下去了,再追究下去就沒完沒了啦,

Mutex 被協程 A 鎖住,協程 B 來搶鎖但失敗被放入等待佇列,此時協程 A 釋放鎖

func (m *Mutex) Unlock() {
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

緊接上回,最初 state 的值為 00000000000...0001001 (waiterCount = 1, lockFlag = 1),執行完 AddInt32(&m.state, -mutexLocked) 后,變成了 0000...001000 (waiterCount = 1) ,new 的值也為 0000...001000,接著就進入到 unlockSlow 里面了,

unlockSlow()

看看 unlockSlow() 的全貌

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

此時 old >> mutexWaiterShift = 0000...0001 ≠ 0, 所以不會直接回傳

old := new
for {
    // If there are no waiters or a goroutine has already
    // been woken or grabbed the lock, no need to wake anyone.
    // In starvation mode ownership is directly handed off from unlocking
    // goroutine to the next waiter. We are not part of this chain,
    // since we did not observe mutexStarving when we unlocked the mutex above.
    // So get off the way.
    if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
        return
    }
    // Grab the right to wake someone.
    new = (old - 1<<mutexWaiterShift) | mutexWoken
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        runtime_Semrelease(&m.sema, false, 1)
        return
    }
    old = m.state
}

接著計算 new = 0000...1000 - 0000...1000 = 0000...0000,waiterCount 由 1 變成了 0,之后進行 CAS 操作,如果 CAS 成功,則從等待佇列中喚醒一個 goroutine,

Mutex 被協程 A 鎖住,協程 B 來搶鎖但失敗被放入等待佇列,此時協程 A 釋放鎖,協程 B 被喚醒

讓我們會視線切到 lockSlow 的后半截,

const starvationThresholdNs = 1e6
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // ...
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
	// ...
    iter = 0
}

當協程 B 從 runtime_SemacquireMutex 處醒來后,會根據該協程的等待的時間來判斷是否饑餓了,這里我們先假設此時還沒有饑餓,后面會詳細討論饑餓時的情況,之后會將 iter 重置為 0,接著就進行下一次的回圈了,由于 iter 已經被重置為 0 了,所以在下一次回圈時,sync_runtime_doSpin(iter) 會回傳 true

由于此時 state 已經變成了 0 了,所以在下一次回圈里可以暢通無阻的拿到鎖,

饑餓情況下的解鎖行為: starvationFlag 的作用

設想這樣一種情況:goroutine A 拿到鎖,goroutine B 搶鎖失敗,被放入等待佇列,goroutine A 釋放鎖,goroutine B 被喚醒,但是正當它搶鎖時,鎖被新來的 goroutine C 搶走了... 連續好幾次,每當 goroutine B 要搶鎖時,鎖都被其他協程搶先一步拿走,直到某一次,goroutine B 再次被喚醒后執行

starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

它就進入饑餓模式 (starvation mode) 啦!

// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}

之后通過 CAS 操作將饑餓標志設定到了 mutex.state 里面,然后它就又被放到等待佇列中了,

atomic.CompareAndSwapInt32(&m.state, old, new)
Unlock()

視角切換到 Unlock() 這一邊

func (m *Mutex) Unlock() {
	// ...
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
	// ...
	if new&mutexStarving == 0 {
		// ...
		for {
			// ...
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
            // ...
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

unlockSlow() 中,此時 new&mutexStarving != 0,所以會直接進入到 else 分支內,呼叫 runtime_Semrelease() 方法,但要注意 else 分支內 runtime_Semrelease() 的引數和 if 分支的引數不一樣,在這里 runtime_Semrelease(&m.sema, true, 1) 起到的作用是喚醒了等待佇列中的第一個協程并立馬調度該協程 (runtime_Semrelease() 方法的詳解在后面 ),

同時正如注釋所說,在 Unlock() 中由于進行了 atomic.AddInt32(&m.state, -mutexLocked) 操作,所以 mutex.state 的 lockFlag 是為 0 的,但是沒關系,starvationFlag 是為 1 的,所以會依舊被認為是鎖住的狀態,

Lock()
func (m *Mutex) Lock() {
	// ...
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	// ...
	for {
		// ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// ...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// ...
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				// ...
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
	// ...
}

視角再次切換到 Lock() 這邊,饑餓的 goroutine 被喚醒并調度后,首先執行 old = m.state, 此時 old 的 starvationFlag = 1,

之后就正如注釋所說,它會嘗試修復 mutex.state 的"不一致" (inconsistent) 狀態,

修復作業主要做了三件事情:

  1. 在 starvation mode 下的 Unlock() 沒有將 waitterCount - 1, 所以這里需要給 mutexWaiter 減 1

  2. 將 state 的 locked flag 置為 1

  3. 如果該 goroutine 沒有饑餓或者是等待佇列中的最后一個 goroutine 的話,清理 starvationFlag

這三件事情通過 atomic.AddInt32(&m.state, delta) 一步到位,

runtime_Semrelease()
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

handoff 就是傳球的意思,handoff 為 false 時,僅僅喚醒等待佇列中第一個協程,但是不會立馬調度該協程;當 handoff 為 true 時,會立馬調度被喚醒的協程,此外,當 handoff = true 時,被喚醒的協程會繼承當前協程的時間片,具體例子,假設每個 goroutine 的時間片為 2ms,gorounte A 已經執行了 1ms,假設它通過 runtime_Semrelease(handoff = true) 喚醒了 goroutine B,則 goroutine B 剩余的時間片為 2 - 1 = 1ms,

饑餓模式下新來的 goroutine 的加鎖行為: starvationFlag 的作用

如果在饑餓模式下,有新的 goroutine 來請求鎖,它會執行下面這些步驟

func (m *Mutex) lockSlow() {
    // ...
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// ...
			runtime_doSpin()
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// ..
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// ...
		} else {
			// ...
		}
	}
	// ...
}

由于 old&(mutexLocked|mutexStarving) != mutexLocked ,所以它不會自旋,

由于 old&mutexStarving != 0,所以它不會 set lockFlag,

由于 old&(mutexLocked|mutexStarving) != 0,所以它 增加 waiterCount,

可以看到,它實際上就做了增加 waiterCount 這一個操作,之后通過 CAS 更新 state 的狀態,更新完成之后就跑去等待佇列睡覺去了,

因此在饑餓狀態下,新的來爭搶鎖的 goroutine 是不會去搶鎖 (set lockFlag) 的,它們只會登記一下 (waiterCount + 1) ,然后乖乖加入到等待佇列里面,

當有協程正在自旋時的解鎖行為: wokenFlag 的作用

wokenFlag 是在 lockSlow() 里面被設定的,wokenFlag 為 1 時,表示此時有協程正在進行自旋,

func (m *Mutex) lockSlow() {
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexwokenFlag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		// ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// ...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// ...
			awoke = true
			iter = 0
		} 
        // ...
	}
    // ...
}

當一個新來的協程 (從未被放到等待佇列中) 在第一次自旋時,wokenFlag 的設定邏輯為:

if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
	atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
}

但是當協程從等待佇列中被喚醒后自旋時,卻 lockSlow()找不到設定 wokenFlag 的邏輯,為何?因為這段邏輯被放到了 unlockSlow 里面了,

視線切換到 unlockSlow() 那一邊

func (m *Mutex) unlockSlow(new int32) {
	// ...
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                // 當 mutexwokenFlag 被設定時,會直接 return
                // 不會去等待佇列喚醒 goroutine
				return
			}
			// Grab the right to wake someone.
            // 這個地方會設定 wokenFlag 哦
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// ...
	}
}

可以看到,當有協程正在自旋時 (wokenFlag = 1) ,不會從等待佇列喚醒協程,這樣就避免了等待佇列上的協程加入競爭,當然,正在自旋中的協程之間彼此之間還是會競爭的;如果 wokenFlag = 0,則會從等待佇列中喚醒一個協程,在喚醒之前會將 wokenFlag 設定為 1,這樣協程被喚醒后就不用再去設定 wokenFlag 了,妙呀!

為什么當有協程在自旋時,不要去等待佇列中喚醒協程呢?協程從被喚醒到被調度 (在 CPU 上面執行) 是要花時間的,等真正自旋時 mutex 早就被搶走了,

協程從等待佇列被喚醒后如果還是沒有搶到鎖,會被放到佇列首部還是尾部?

但是是頭部,代碼如下:

// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

復雜情景分析

基于上面的邏輯來分析一下復雜的邏輯吧!

假設有協程 g1,g2,g3,g4,g5,g6, 共同爭搶一把鎖 m

一開始 g1 拿到鎖

owner: g1 waitqueue: null

g2 開始搶鎖,沒有搶到,被放到等待佇列

owner: g1 waitqueue: g2

g1 釋放鎖,g2 從等待佇列中被喚醒

owner: null waitqueue: null

此時 g3 也開始搶鎖,g2 沒有搶過,又被放回等待佇列

owner: g3 waitqueue: g2

g4 開始搶鎖,沒有搶到,被放到等待佇列

owner: g3 waitqueue: g2, g4

g3 釋放鎖,g2 被喚醒

owner: null waitqueue: g4

此時 g5 開始搶鎖,g2 沒有搶過,又被放回等待佇列首部

owner: g5 waitqueue: g2, g4

g6 開始搶鎖,正在自旋中

owner: g5 waitqueue: g2, g4 wokenFlag: 1 spinning: g6

g5 釋放鎖,由于此時有協程正在自旋,因此不會去等待佇列中喚醒協程,鎖被 g6 輕松搶到

owner: g6 waitqueue: g2, g4 wokenFlag: 0 spinning: null

g6 釋放鎖,g2 被喚醒,此時 g7 開始搶鎖,g2 沒有搶過,又被放回等待佇列首部,但是 g2 由于太久沒有搶到鎖,進入饑餓模式了

owner: g7 waitqueue: g2(饑餓), g4 starvationFlag: 1

g8 來搶鎖,由于處于饑餓狀態,g8 會被直接放在等待佇列尾部

owner: g7 waitqueue: g2(饑餓), g4, g8 starvationFlag: 1

g7 釋放鎖,由于處于饑餓狀態,會直接喚醒 g2 并調度它

owner: g2 waitqueue: g4, g8 starvationFlag: 1

g2 執行完畢,釋放鎖,注意此刻依舊是饑餓狀態,直接調度 g4,g4 蘇醒后,發現它自己沒有饑餓,于是 clear starvationFlag

owner: g4 waitqueue: g8 starvationFlag: 0

此時新來的 g8 可以正常加入到對鎖的爭搶中了,之后就是正常的加鎖解鎖邏輯了,

一點小瑕疵: 一種很邊緣的 starvation case

由于等待佇列中的協程只有當被喚醒之后才會根據等待時間來判斷是否進入 starvation mode,因此會存在一個協程在等待佇列中等待了很久,它實際上已經饑餓了,但是一直沒被喚醒過,就沒機會 set starvationFlag,這就會導致饑餓現象的發生,

那么會存在等待佇列里的協程一直不被喚醒的情況么?

有的!在 unlockSlow() 時如果 wokenFlag = 1,那就不會去喚醒等待佇列中的執行緒,就會存在這樣一種情況,假設每次 Unlock() 時恰好有一個新來的協程在自旋,那等待佇列中的協程就會永遠饑餓下去!

reference

Tail Latency Might Matter More Than You Think

Golang 互斥鎖內部實作

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/456146.html

標籤:其他

上一篇:做仿牛客社區專案的搭建環境中,下載的合集包里面少個AOP

下一篇:c++異步回呼函式參考傳遞空指標例外

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more