參考原文
原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
問題描述
直入本文要描述的問題:網站流量上來了,高并發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考慮將耗時任務異步處理,那么如果使用Go如何實作?
傳統上,我們會考慮使用以下方法創建作業者層架構:
- Resque(佇列,比如redis resque)
- DelayedJob(延遲任務,比如go defer)
- Elasticbeanstalk Worker Tier
- RabbitMQ(訊息佇列)
簡單慣用法
golang的異步處理之攜程:go func()可以帶來了很大的方便,雖然協程相對于執行緒占用的系統資源更少,但這并不代表我們可以無休止的創建協程,
不停創建協程也有壓垮系統的風險,然而絕大多數的時候,我們不能簡單粗暴的創建協程來處理異步任務,原因是不可控,下面我們參考原作者的demo,一個執行耗時任務的handler,
代碼我們只用看大致的實作流程原理,實作細節暫且不論,
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "time" ) type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
對于適量的負載,這個方案應該沒有問題,但是負載增加以后這個方法就不能很好地作業,當我們把這個版本部署到生產環境中后,如果我們遇到了比預期大一個數量級的請求量,
那么這個方法就有些不盡如人意了,它無法控制創建goroutine的數量,因為我們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了,
這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性,我們自然而然就會想,怎么能讓系統更可控呢?
優雅的方法
創建帶緩沖的channel,這樣我們可以把作業任務放到佇列里然后再上傳到S3,因為可以控制佇列的長度并且有充足的記憶體,我覺得把作業任務快取在channel佇列里應該沒有問題,
所以一個很自然的思路那就是:建立任務佇列,golang提供了執行緒安全的任務佇列實作方式:帶緩沖的channal,但是這樣只是延后了請求的爆發,
作者意識到,要解決這一問題,必須控制協程的數量,如何控制協程的數量?Job/Worker模式!這里我將作者的代碼修改了一下,單檔案可執行,以記錄并理解這一模式,
package main import ( "fmt" "reflect" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待執行的作業 type Job struct { Payload Payload } //任務channal var JobQueue chan Job //執行任務的作業者單元 type Worker struct { WorkerPool chan chan Job //作業者池--每個元素是一個作業者的私有任務channal JobChannel chan Job //每個作業者單元包含一個任務管道 用于獲取任務 quit chan bool //退出信號 no int //編號 } //創建一個新作業者單元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("創建一個新作業者單元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //回圈 監聽任務和結束信號 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任務 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信號 return } } }() } // 停止信號 func (w Worker) Stop() { go func() { w.quit <- true }() } //調度中心 type Dispatcher struct { //作業者池 WorkerPool chan chan Job //作業者數量 MaxWorkers int } //創建調度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //作業者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空閑worker (任務多的時候會阻塞這里) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 20; i++ { // 新建一個任務 payLoad := Payload{Num: 1} work := Job{Payload: payLoad} // 任務放入任務佇列channal JobQueue <- work fmt.Println("JobQueue <- work") time.Sleep(1 * time.Second) } } /* 一個任務的執行程序如下 JobQueue <- work 新任務入隊 job := <-JobQueue: 調度中心收到任務 jobChannel := <-d.WorkerPool 從作業者池取到一個作業者 jobChannel <- job 任務給到作業者 job := <-w.JobChannel 作業者取出任務 {{1}} 執行任務 w.WorkerPool <- w.JobChannel 作業者在放回作業者池 */
這樣,我們已經能夠主動的控制worker的數量,這時候,我問哈大家,我們完全解決問題了么?如果有大量的任務同時涌入,會發生什么樣的結果,程式會阻塞等待可用的worker
jobChannel := <-d.WorkerPool
下面是我們的dispatcher實作代碼:
//調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空閑worker (任務多的時候會阻塞這里) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } }
這里我們提供了創建worker的最大數目作為引數,并把這些worker加入到worker池里,不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker,我們再改一下代碼如下:
package main import ( "fmt" "reflect" "runtime" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待執行的作業 type Job struct { Payload Payload } //任務channal var JobQueue chan Job //執行任務的作業者單元 type Worker struct { WorkerPool chan chan Job //作業者池--每個元素是一個作業者的私有任務channal JobChannel chan Job //每個作業者單元包含一個任務管道 用于獲取任務 quit chan bool //退出信號 no int //編號 } //創建一個新作業者單元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("創建一個新作業者單元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //回圈 監聽任務和結束信號 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任務 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信號 return } } }() } // 停止信號 func (w Worker) Stop() { go func() { w.quit <- true }() } //調度中心 type Dispatcher struct { //作業者池 WorkerPool chan chan Job //作業者數量 MaxWorkers int } //創建調度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //作業者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { fmt.Println("等待空閑worker (任務多的時候會阻塞這里") //等待空閑worker (任務多的時候會阻塞這里) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 100; i++ { // 新建一個任務 payLoad := Payload{Num: i} work := Job{Payload: payLoad} // 任務放入任務佇列channal JobQueue <- work fmt.Println("JobQueue <- work", i) fmt.Println("當前協程數:", runtime.NumGoroutine()) time.Sleep(100 * time.Millisecond) } }
執行結果如下:


這里我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量,這種情況下,如果worker數量設定的合理且異步任務耗時不是特別長的情況下一般沒有問題,但是出于安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎么處理?
真正控制協程數量(并發執行的任務數)
我們可以控制并發執行(包括等待執行)的任務數,我們加入任務使用如下判斷,用一個帶緩沖的Channel控制并發執行的任務數,
當任務異步處理完成的時候執行<- DispatchNumControl釋放控制即可,用這種方法,
我們可以根據壓測結果設定合適的并發數從而保證系統能夠盡可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴呼叫方:牛仔我很忙),
比如定義一個limit函式讀取是否存在發送的任務佇列:
//用于控制并發處理的協程數 var DispatchNumControl = make(chan bool, 10000) func Limit(work Job) bool { select { case <-time.After(time.Millisecond * 100): fmt.println("牛仔我很忙") return false case DispatchNumControl <- true: // 任務放入任務佇列channal jobChannel <- work return true } }
結束語
我們本可以通過大量的佇列,后臺workers,復雜的調度來設計一套復雜的系統,協程是個好的設計,但任何東西都不能過度使用,
我們做系統設計的時候,一定也要時刻想著控制:要對自己設計的系統有著足夠的控制力,
另外綜合上面的實作,為什么 dispatch 這里要用 協程 呢?阻塞完全沒問題? 歡迎廣大博友拍磚留言,,,,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/3645.html
標籤:Go
