今天這篇筆記我們來學習Go 限流
限流是分布式系統中經常需要用到的技術,因為我們讓請求沒有限制,很容易就出現某個用戶開很多執行緒把我們的服務拉跨,進而影響到別的用戶,
限流
我們來看下Go語言層面可以怎么做到限流,先看一段不限流的代碼,
type APIConnection struct{}
func Open() *APIConnection {
return &APIConnection{}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
//假裝我們在這里有運行
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
//假裝我們在這里有運行
return nil
}
func main() {
defer log.Printf("Done")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
apiConnection := Open()
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ReadFile(context.Background())
if err != nil {
log.Printf("cannot ReadFile : %v", err)
}
log.Printf("ReadFile")
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ResolveAddress(context.Background())
if err != nil {
log.Printf("cannot ResolveAddress : %v", err)
}
log.Printf("ResolveAddress")
}()
}
wg.Wait()
}
上面的代碼我們定義了兩個假想的方法ReadFile 和 ResolveAddress, 假設他們是去訪問檔案和讀取網路,都是比較耗資源的操作,然后開啟了20個goroutine去呼叫這兩個方法
這段代碼的運行結果如下
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 Done
我們可以看到一瞬間就都運行完了,如果我們訪問了實際的資源,然后又開了很多的goroutine,那么很容易就耗盡資源, 為了防止這樣的事情發生,我們引入限流,限定一段時間內,只能訪問一定的資源, 我們今天要講的是基于令牌桶演算法的限速,令牌桶是什么演算法呢? 很簡單就是有一個基礎的令牌數d, 然后有固定的速度r往令牌桶中放令牌, 用戶拿到令牌才能進行下一步,拿不到就等待,
我們來看代碼
type APIConnection struct {
rateLimiter *rate.Limiter
}
func Open() *APIConnection {
return &APIConnection{
rateLimiter: rate.NewLimiter(rate.Limit(2), 5),
}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
main func我們沒有修改,這里只是在APIConnection 中增加了一個
rateLimiter: rate.NewLimiter(rate.Limit(2), 5)
rate是golang.org/x/time/rate 下面的一個包, rate.NewLimiter是限速器,方法定義如下
func NewLimiter(r Limit, b int) *Limiter
r就是我們前面說的速率,每秒多少個令牌
b 就是令牌桶的高度,開始的時候有幾個,
然后在ReadFile 和 ResolveAddress 方法中增加了a.rateLimiter.Wait(ctx), Wait就是等待有令牌出現,
運行的結果如下所示
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ResolveAddress
02:48:17 ResolveAddress
02:48:17 ResolveAddress
02:48:18 ReadFile
02:48:18 ResolveAddress
02:48:19 ResolveAddress
02:48:19 ResolveAddress
02:48:20 ResolveAddress
02:48:20 ReadFile
02:48:21 ResolveAddress
02:48:21 ReadFile
02:48:22 ReadFile
02:48:22 ReadFile
02:48:23 ResolveAddress
02:48:23 ReadFile
02:48:24 ResolveAddress
02:48:24 Done
通過時間我們可以看到前面很快執行了5次,就是拿到了令牌桶中的5個令牌,后面每秒中執行兩次,也就是我們的速率2個/秒,程式運行符合我們預期,達到了限速的效果,
組合限流
書中作者還舉了兩個例子,運用組合來限速,比如要求一秒中不能超過兩個,同時一分鐘不能超過10個, 屬于Go語言的一點組合功能,示例代碼如下
type RateLimiter interface {
Wait(context.Context) error
Limit() rate.Limit
}
type multiLimiter struct {
limiters []RateLimiter
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
}
sort.Slice(limiters, byLimit)
return &multiLimiter{limiters: limiters}
}
func (l *multiLimiter) Wait(ctx context.Context) error {
for _, l := range l.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (l *multiLimiter) Limit() rate.Limit {
return l.limiters[0].Limit()
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
type APIConnection struct {
rateLimiter RateLimiter
}
func Open() *APIConnection {
secondLimit := rate.NewLimiter(Per(2, time.Second), 1)
minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10)
return &APIConnection{rateLimiter: MultiLimiter(secondLimit, minuteLimit)}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
定義了multiLimiter來組合這些限速器,然后定義了Wait方法,比較簡單,這里不詳述
還可以不同的設備分不同的限速器, 這里也是貼出代碼不詳述
type APIConnection struct {
networkLimit,
diskLimit,
apiLimit RateLimiter
}
func Open() *APIConnection {
return &APIConnection{
apiLimit: MultiLimiter(
rate.NewLimiter(Per(2, time.Second), 1),
rate.NewLimiter(Per(10, time.Minute), 10),
),
diskLimit: MultiLimiter(
rate.NewLimiter(rate.Limit(1), 1),
),
networkLimit: MultiLimiter(
rate.NewLimiter(Per(3, time.Second), 3),
),
}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx); err != nil {
return err
}
return nil
}
不同用戶限流
我們做web請求的時候,會遇到這樣的需求,根據不同的用戶給不同的限速,這里簡單的給個sample, 其實就是用map把用戶和限速器關聯起來,
var userLimit = make(map[string]*rate.Limiter)
func doWork(user string) {
if userLimit[user].Allow() {
log.Printf("%s do work \n", user)
} else {
log.Printf("%s not work \n", user)
}
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
userLimit["user1"] = rate.NewLimiter(Per(2, time.Second), 1)
userLimit["user2"] = rate.NewLimiter(Per(2, time.Minute), 5)
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
doWork("user1")
}()
time.Sleep(500 * time.Millisecond)
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
doWork("user2")
}()
time.Sleep(500 * time.Millisecond)
}
wg.Wait()
}
上面的例子中用戶1被限制1秒訪問2次,用戶2被限制1分鐘訪問2次
03:19:14 user1 do work
03:19:15 user1 do work
03:19:15 user1 do work
03:19:16 user1 do work
03:19:16 user1 do work
03:19:17 user1 do work
03:19:17 user1 do work
03:19:18 user1 do work
03:19:18 user1 do work
03:19:19 user1 do work
03:19:20 user2 do work
03:19:20 user2 do work
03:19:21 user2 do work
03:19:21 user2 do work
03:19:22 user2 do work
03:19:22 user2 not work
03:19:23 user2 not work
03:19:23 user2 not work
03:19:24 user2 not work
03:19:24 user2 not work
這里用戶1基本能得到執行,用戶2執行了5次后,由于沒有拿到令牌,就不能work了,這樣達到了不同用戶,不同的限速器,
總結
對于限速,可以在服務器層面進行限速,我們這里是在后臺程式端進行限速, 也有不少現成的解決方案 https://www.jianshu.com/p/c13843d2e1ec
對于分布式的系統這樣的限速自然是不夠的,可以結合redis的功能來進行限速,網上有看到些方法: https://blog.csdn.net/jim_007/article/details/110084822
沒有實際操作,后面我們再實際操作下再來記錄,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/538689.html
標籤:其他
