文章目錄
- 一、Go語言并發的基礎元素
- 1.goroutine
- 2.channel
- 3.sync 包的同步原語
- 4.多并發控制神器:Context
- 二、常見并發模式Go語言實作
- 1.for select 回圈模式
- 無限回圈(監控狗)
- 有限回圈(for range select )
- 2.select timeout 模式
- 3.流水線模式(Pipeline)
- 4.扇出和扇入模式
- 5.未來模式(Futures)
一、Go語言并發的基礎元素
goroutine、channel、sync 是并發編程中必不可少的元素,
context標準包更是為我們并發編程提供了更好的支持,
1.goroutine
2.channel
常用結構:channel+select
select {
case i1 = <-c1:
//todo
case c2 <- i2:
//todo
default:
// default todo
}
3.sync 包的同步原語
sync.Mutex 互斥鎖
sync.RWMutex 讀寫鎖
sync.WaitGroup
sync.Once
sync.Cond
4.多并發控制神器:Context
二、常見并發模式Go語言實作
并發模式和設計模式很相似,都是對現實場景的抽象封裝,以便提供一個統一的解決方案,
但和設計模式不同的是,并發模式更專注于異步和并發,
我們會在很多專案的源代碼中一遍遍的看到下面提到的并發模式,雖然解決的問題不一樣,但它們的思路是相似的,所以我們也可以把它們進一步抽象,這樣在專案開發中就可以直接復用,
并發模式不限于下面列舉的這些,在專案中和并發、異步有關并且可以被抽象復用的解決方案都可以總結為并發模式,
1.for select 回圈模式
for select 回圈模式非常常見,它一般和 channel 組合完成任務,代碼格式如下:
for { //for無限回圈,或者for range回圈
select {
//通過一個channel控制
}
}
For select有兩種回圈模式:有限回圈和無限回圈,
無限回圈(監控狗)
//default 陳述句中執行任務,
//done channel 接收關閉通知,
for {
select {
case <-done:
return
default:
//執行具體的任務
}
}
有限回圈(for range select )
一般用于把可以迭代的內容發送到 channel 上:
//done channel接收關閉通知,
//resultCh channel 用于接收 for range 回圈的值,這些值通過 resultCh 可以傳送給其他的呼叫者,
for _,s:=range []int{}{
select {
case <-done:
return
case resultCh <- s:
}
}
2.select timeout 模式
獲取資料遇到超時時,我們不可能一直等待,所以需要設定一個超時時間,如下所示
func main() {
result := make(chan string)
go func() {
//模擬網路訪問
time.Sleep(8 * time.Second)
result <- "服務端結果"
}()
select {
case v := <-result:
fmt.Println(v)
case <-time.After(5 * time.Second):
fmt.Println("網路訪問超時了")
}
}
select timeout 模式的核心在于通過 time.After 函式設定一個超時時間,防止因為例外造成 select 陳述句的無限等待,
小提示:如果可以使用 Context 的 WithTimeout 函式超時取消,要優先使用,
3.流水線模式(Pipeline)
以組裝手機為例,假設一條組裝手機的流水線有 3 道工序,分別是配件采購、配件組裝、打包成品,如圖所示:

從以上示意圖中可以看到,采購的配件通過 channel 傳遞給工序 2 進行組裝,然后再通過 channel 傳遞給工序 3 打包成品,相對工序 2 來說,工序 1 是生產者,工序 3 是消費者,相對工序 1 來說,工序 2 是消費者,相對工序 3 來說,工序 2 是生產者,
我用下面的幾組代碼進行演示:
//工序1采購
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- fmt.Sprint("配件", i)
}
}()
return out
}
首先我們定義一個采購函式 buy,它有一個引數 n,可以設定要采購多少套配件,采購代碼的實作邏輯是通過 for 回圈產生配件,然后放到 channel 型別的變數 out 里,最后回傳這個 out,呼叫者就可以從 out 中獲得配件,
//工序2組裝
func build(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "組裝(" + c + ")"
}
}()
return out
}
組裝函式 build 有一個 channel 型別的引數 in,用于接收配件進行組裝,組裝后的手機放到 channel 型別的變數 out 中回傳,
有了組裝好的手機,就可以放在精美的包裝盒中售賣了,而包裝的操作是工序 3 完成的,對應的函式是 pack,如下所示:
//工序3打包
func pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "打包(" + c + ")"
}
}()
return out
}
函式 pack 的代碼實作和組裝函式 build 基本相同
流水線上的三道工序都完成后,就可以通過一個組織者把三道工序組織在一起,形成一條完整的手機組裝流水線,這個組織者可以是我們常用的 main 函式,如下面的代碼所示:
func main() {
coms := buy(10) //采購10套配件
phones := build(coms) //組裝10部手機
packs := pack(phones) //打包它們以便售賣
//輸出測驗,看看效果
for p := range packs {
fmt.Println(p)
}
}
從上述例子中,我們可以總結出一個流水線模式的構成:
- 流水線由一道道工序構成,每道工序通過 channel 把資料傳遞到下一個工序;
- 每道工序一般會對應一個函式,函式里有協程和 channel,協程一般用于處理資料并把它放入 channel 中,整個函式會回傳這個 channel 以供下一道工序使用;
- 最終要有一個組織者(示例中的 main 函式)把這些工序串起來,這樣就形成了一個完整的流水線,對于資料來說就是資料流,
4.扇出和扇入模式
假如上面的流水線出現了問題,其中的工序2特別慢,
為了提升產能,需要對工序 2 增加兩班人手,如圖便形成了扇入扇出:

什么是扇入扇出:
1.以工序 1 為中點,三條傳遞資料的線發散出去,就像一把打開的扇子一樣,所以叫扇出;
2.以 merge 組件為中點,三條傳遞資料的線匯聚到 merge 組件,也像一把打開的扇子一樣,所以叫扇入
小提示:扇出和扇入都像一把打開的扇子,因為資料傳遞的方向不同,所以叫法也不一樣,扇出的資料流向是發散傳遞出去,是輸出流;扇入的資料流向是匯聚進來,是輸入流,
三道工序的實作函式 buy、build、pack 都保持不變,只需要增加一個 merge 函式即可:
//扇入函式(組件),把多個chanel中的資料發送到一個channel中
func merge(ins ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
//把一個channel中的資料發送到out中
p:=func(in <-chan string) {
defer wg.Done()
for c := range in {
out <- c
}
}
wg.Add(len(ins))
//扇入,需要啟動多個goroutine用于處于多個channel中的資料
for _,cs:=range ins{
go p(cs)
}
//等待所有輸入的資料ins處理完,再關閉輸出out
go func() {
wg.Wait()
close(out)
}()
return out
}
新增的 merge 函式的核心邏輯就是對輸入的每個 channel 使用單獨的協程處理,并將每個協程處理的結果都發送到變數 out 中,達到扇入的目的,總結起來就是通過多個協程并發,把多個 channel 合成一個,
在整條手機組裝流水線中,merge 函式非常小,而且和業務無關,不能當作一道工序,所以我把它叫作組件,該 merge 組件是可以復用的,流水線中的任何工序需要扇入的時候,都可以使用 merge 組件,
小提示:這次的改造新增了 merge 函式,其他函式保持不變,符合開閉原則,開閉原則規定“軟體中的物件(類,模塊,函式等等)應該對于擴展是開放的,但是對于修改是封閉的”,
有了可以復用的 merge 組件,現在來看流水線的組織者 main 函式是如何使用扇出和扇入并發模式的,如下所示:
func main() {
coms := buy(100) //采購100套配件
//三班人同時組裝100部手機
phones1 := build(coms)
phones2 := build(coms)
phones3 := build(coms)
//匯聚三個channel成一個
phones := merge(phones1,phones2,phones3)
packs := pack(phones) //打包它們以便售賣
//輸出測驗,看看效果
for p := range packs {
fmt.Println(p)
}
}
這個示例采購了 100 套配件,也就是開始增加產能了,于是同時呼叫三次 build 函式,也就是為工序 2 增加人手,這里是三班人手同時組裝配件,然后通過 merge 函式這個可復用的組件將三個 channel 匯聚為一個,然后傳給 pack 函式打包,
這樣通過扇出和扇入模式,整條流水線就被擴充好了,大大提升了生產效率,因為已經有了通用的扇入組件 merge,所以整條流水中任何需要扇出、扇入提高性能的工序,都可以復用 merge 組件做扇入,并且不用做任何修改,
5.未來模式(Futures)
流水線模式中的工序是相互依賴的,上一道工序做完,下一道工序才能開始,但是在我們的實際需求中,也有大量的任務之間相互獨立、沒有依賴,所以為了提高性能,這些獨立的任務就可以并發執行,
主協程不用等待子協程回傳的結果,可以先去做其他事情,等未來需要子協程結果的時候再來取,如果子協程還沒有回傳結果,就一直等待,
//洗菜
func washVegetables() <-chan string {
vegetables := make(chan string)
go func() {
time.Sleep(5 * time.Second)
vegetables <- "洗好的菜"
}()
return vegetables
}
//燒水
func boilWater() <-chan string {
water := make(chan string)
go func() {
time.Sleep(5 * time.Second)
water <- "燒開的水"
}()
return water
}
洗菜和燒水這兩個相互獨立的任務可以一起做,所以示例中通過開啟協程的方式,實作同時做的功能,當任務完成后,結果會通過 channel 回傳,
小提示:示例中的等待 5 秒用來描述洗菜和啥訓的耗時,
在啟動兩個子協程同時去洗菜和燒水的時候,主協程就可以去干點其他事情(示例中是瞇一會),等睡醒了,要做火鍋的時候,就需要洗好的菜和燒好的水這兩個結果了,我用下面的代碼進行演示:
func main() {
vegetablesCh := washVegetables() //洗菜
waterCh := boilWater() //燒水
fmt.Println("已經安排洗菜和燒水了,我先瞇一會")
time.Sleep(2 * time.Second)
fmt.Println("要做火鍋了,看看菜和水好了嗎")
vegetables := <-vegetablesCh
water := <-waterCh
fmt.Println("準備好了,可以做火鍋了:",vegetables,water)
}
Futures 模式下的協程和普通協程最大的區別是可以回傳結果,而這個結果會在未來的某個時間點使用,所以在未來獲取這個結果的操作必須是一個阻塞的操作,要一直等到獲取結果為止,
如果你的大任務可以拆解為一個個獨立并發執行的小任務,并且可以通過這些小任務的結果得出最終大任務的結果,就可以使用 Futures 模式,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/249052.html
標籤:其他
