前言
日常任務開放中,我們會有很多異步、批量、定時、延遲任務要處理,go-zero中有 go-queue,推薦使用 go-queue 去處理,go-queue 本身也是基于 go-zero 開發的,其本身是有兩種模式:
dq:依賴于beanstalkd,適合延時、定時任務執行;kq:依賴于kafka,適用于異步、批量任務執行;
本篇就先從 dq 開始,慢慢探究 go-queue 背后執行的邏輯,
dq 簡介
dq 封裝底層 beanstalkd 操作,分布式存盤,延遲、定時設定,重啟服務可以重新執行,但是訊息不會丟失,因為訊息的處理都交由 beanstalkd 完成,
可以看出使用非常簡單,同時 dq 中使用了 redis setnx 保證了每個訊息只被消費一次,但是在生產者端沒有使用 redis 做訊息存盤,這個和前面描述的一致,
對 dq 的整體架構做了簡單介紹,下面就開始正式的探索 ??
生產者 example
func main() {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {
// Delay:延遲執行
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
// At:在某一個時刻執行
//_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
if err != nil {
fmt.Println(err)
}
}
}
從使用上,簡單分為兩步:
NewProducer(opts):將本地佇列的埠配置和主題配置傳入生產者;producer.Delay():使用剛創建好的 生產者,呼叫它的Delay(),將需要異步發送的訊息傳入,Delay還需要傳入延遲執行的時間,
需要說明的是:創建的 producer 是一個介面,Delay() 只是介面其中的一個方法,后續會其他的方法和內部設計,那我們就繼續往下探索吧~~~
深入生產者執行流程
下面從 example 的代碼進去,看整個函式的呼叫鏈,
初始化
dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生產者
|- NewProducerNode(endpoint, tube) // endpoint,tube 來自傳入的配置陣列
緊接著就到 producerNode.go ,這個部分就會牽涉到 beanstalk 的初始化:
NewProducerNode(endpoint, tube)
|- conn: newConnection(endpoint, tube)
|- return &connection{}
這就涉及到 beanstalk:connection.conn -> *beanstalk.Conn,
但是在 newConnection() 中并沒有對 beanstalk.Conn 進行初始化,這屬于 延遲初始化
Delay
首先是生產者端呼叫 producer.Delay(data, timesecond) ,就把訊息插入到內部佇列,timesecond 就是延遲執行的時間,我們來看看 Delay() 到底做了什么?
p.Delay(data, timesecond)
|- p.wrap(data, time) // 將 data 和 time 包裝到一塊
|- p.insert(nodeFn)
|- node.Delay() // for rangre p.node 每一個node都執行一遍 `Delay()`
而 p.insert 就是將上一步封裝好的 data 傳遞給 p{cluster} 的每一個node去執行 node.Delay,
在前面的 初始化 說過,最開始是沒有對 conn 進行初始化,那現在要插入資料,總不能不初始化這個 conn ,
node.Delay() // 配置中的每個node都執行 `Delay()`
|- node.conn.get() // 獲取node中的conn【conn==nil,就初始化一個conn】
|- _, err := conn.Put(data, deplay, opts...)
|- node.conn.reset() // 出現err情況下,如OOM/Timeout等情況 -> 關閉conn,防止泄漏
所以最后 Delay 實際上是執行 tube.Put(data, delay):
tube.Put(data, delay)
|- tube.Conn.cmd("put", ...) // 生產者發布job
這里就涉及到 beanstalk 的 Put 操作:首先看看生產者 Put 指令引數說明:
put <pri> <delay> <ttr> <bytes> <data>
<pri>:優先級,值越小優先級越高,默認為1024;<delay>:延遲ready秒數,在這段時間 job 為delayed狀態;<ttr>:time to run,允許 worker 執行的最大秒數,如果 worker 在這段時間不能 delete,release,bury job,那么當 job 超時,服務器將自動 release 此job;<bytes>:job body的長度,不包含\r\n;<data>: job body data;
OK,那插入 job 成功,回應什么呢?
INSERTED <id>\r\n
回傳的 id 是插入 job 的任務標識,到此 Put 分析完畢,跟著代碼走一遍:
tube.Put(data, priority, daley, ttr)
|- tube.Conn.cmd("put", ...)
|- tube.Conn.readResp("INSERTED id")
|- return id, err // 將id回傳
這樣我們在 example 中直接可以看到的 生產者 執行的操作就介紹完了,上圖,圖更好說話:

producer interface
那么除了 example 中使用的 Delay() ,還有其余幾個方法:
Producer interface {
At(body []byte, at time.Time) (string, error)
Close() error
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error
}
At:指定某個時間執行【實質也是執行Delay()】Close:關閉全部node的連接Delay:延遲執行,傳入延遲的時間,Revoke:實質上是當出現最小寫入節點<2時,觸發添加失敗,將添加成功的job洗掉掉,
當然,事實上 dq 使用上,開發者只需要使用 At/Delay 就行了,也就是你只要知道你的任務是定時觸發還是延遲觸發即可,剩下的,dq 內部的封裝都已經幫你做好了,
框架地址
https://github.com/tal-tech/go-queue
同時在 go-queue 也大量使用 go-zero 的流式處理庫 fx,
https://github.com/tal-tech/go-zero
歡迎使用 go-queue 并 star 支持我們!一起構建 go-zero 生態!??
go-zero 系列文章見『微服務實踐』公眾號
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/270581.html
標籤:Go
上一篇:編程入門:零基礎想要學好C/C++編程?那你一定要看看這五個步驟!
下一篇:Golang變數、常量
