在微服務中開發中,api網關扮演對外提供restful api的角色,而api的資料往往會依賴其他服務,復雜的api更是會依賴多個甚至數十個服務,雖然單個被依賴服務的耗時一般都比較低,但如果多個服務串行依賴的話那么整個api的耗時將會大大增加,
那么通過什么手段來優化呢?我們首先想到的是通過并發來的方式來處理依賴,這樣就能降低整個依賴的耗時,Go基礎庫中為我們提供了 WaitGroup 工具用來進行并發控制,但實際業務場景中多個依賴如果有一個出錯我們期望能立即回傳而不是等所有依賴都執行完再回傳結果,而且WaitGroup中對變數的賦值往往需要加鎖,每個依賴函式都需要添加Add和Done對于新手來說比較容易出錯
基于以上的背景,go-zero框架中為我們提供了并發處理工具MapReduce,該工具開箱即用,不需要做什么初始化,我們通過下圖看下使用MapReduce和沒使用的耗時對比:

相同的依賴,串行處理的話需要200ms,使用MapReduce后的耗時等于所有依賴中最大的耗時為100ms,可見MapReduce可以大大降低服務耗時,而且隨著依賴的增加效果就會越明顯,減少處理耗時的同時并不會增加服務器壓力
并發處理工具MapReduce
MapReduce是Google提出的一個軟體架構,用于大規模資料集的并行運算,go-zero中的MapReduce工具正是借鑒了這種架構思想
go-zero框架中的MapReduce工具主要用來對批量資料進行并發的處理,以此來提升服務的性能

我們通過幾個示例來演示MapReduce的用法
MapReduce主要有三個引數,第一個引數為generate用以生產資料,第二個引數為mapper用以對資料進行處理,第三個引數為reducer用以對mapper后的資料做聚合回傳,還可以通過opts選項設定并發處理的執行緒數量
場景一: 某些功能的結果往往需要依賴多個服務,比如商品詳情的結果往往會依賴用戶服務、庫存服務、訂單服務等等,一般被依賴的服務都是以rpc的形式對外提供,為了降低依賴的耗時我們往往需要對依賴做并行處理
func productDetail(uid, pid int64) (*ProductDetail, error) {
var pd ProductDetail
err := mr.Finish(func() (err error) {
pd.User, err = userRpc.User(uid)
return
}, func() (err error) {
pd.Store, err = storeRpc.Store(pid)
return
}, func() (err error) {
pd.Order, err = orderRpc.Order(pid)
return
})
if err != nil {
log.Printf("product detail error: %v", err)
return nil, err
}
return &pd, nil
}
該示例中回傳商品詳情依賴了多個服務獲取資料,因此做并發的依賴處理,對介面的性能有很大的提升
場景二: 很多時候我們需要對一批資料進行處理,比如對一批用戶id,效驗每個用戶的合法性并且效驗程序中有一個出錯就認為效驗失敗,回傳的結果為效驗合法的用戶id
func checkLegal(uids []int64) ([]int64, error) {
r, err := mr.MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
uid := item.(int64)
ok, err := check(uid)
if err != nil {
cancel(err)
}
if ok {
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var uids []int64
for p := range pipe {
uids = append(uids, p.(int64))
}
writer.Write(uids)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}
return r.([]int64), nil
}
func check(uid int64) (bool, error) {
// do something check user legal
return true, nil
}
該示例中,如果check程序出現錯誤則通過cancel方法結束效驗程序,并回傳error整個效驗程序結束,如果某個uid效驗結果為false則最終結果不回傳該uid
MapReduce使用注意事項
- mapper和reducer中都可以呼叫cancel,引數為error,呼叫后立即回傳,回傳結果為nil, error
- mapper中如果不呼叫writer.Write則item最終不會被reducer聚合
- reducer中如果不呼叫writer.Wirte則回傳結果為nil, ErrReduceNoOutput
- reducer為單執行緒,所有mapper出來的結果在這里串行聚合
實作原理分析:
MapReduce中首先通過buildSource方法通過執行generate(引數為無緩沖channel)產生資料,并回傳無緩沖的channel,mapper會從該channel中讀取資料
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})
go func() {
defer close(source)
generate(source)
}()
return source
}
在MapReduceWithSource方法中定義了cancel方法,mapper和reducer中都可以呼叫該方法,呼叫后主執行緒收到close信號會立馬回傳
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
// 默認的error
retErr.Set(ErrCancelWithNil)
}
drain(source)
// 呼叫close(ouput)主執行緒收到Done信號,立馬回傳
finish()
})
在mapperDispatcher方法中呼叫了executeMappers,executeMappers消費buildSource產生的資料,每一個item都會起一個goroutine單獨處理,默認最大并發數為16,可以通過WithWorkers進行設定
var wg sync.WaitGroup
defer func() {
wg.Wait() // 保證所有的item都處理完成
close(collector)
}()
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // 將mapper處理完的資料寫入collector
for {
select {
case <-done: // 當呼叫了cancel會觸發立即回傳
return
case pool <- lang.Placeholder: // 控制最大并發數
item, ok := <-input
if !ok {
<-pool
return
}
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-pool
}()
mapper(item, writer) // 對item進行處理,處理完呼叫writer.Write把結果寫入collector對應的channel中
}()
}
}
reducer單goroutine對數mapper寫入collector的資料進行處理,如果reducer中沒有手動呼叫writer.Write則最侄訓執行finish方法對output進行close避免死鎖
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
}()
在該工具包中還提供了許多針對不同業務場景的方法,實作原理與MapReduce大同小異,感興趣的同學可以查看原始碼學習
- MapReduceVoid 功能和MapReduce類似但沒有結果回傳只回傳error
- Finish 處理固定數量的依賴,回傳error,有一個error立即回傳
- FinishVoid 和Finish方法功能類似,沒有回傳值
- Map 只做generate和mapper處理,回傳channel
- MapVoid 和Map功能類似,無回傳
本文主要介紹了go-zero框架中的MapReduce工具,在實際的專案中非常實用,用好工具對于提升服務性能和開發效率都有很大的幫助,希望本篇文章能給大家帶來一些識訓,
專案地址
https://github.com/tal-tech/go-zero
好未來技術
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/173098.html
標籤:Go
