NSQ 訊息佇列實作訊息落地使用的是 FIFO 佇列,
實作為 diskqueue , 使用包github.com/nsqio/go-diskqueue,本文主要對diskqueue的實作做介紹,
功能定位
- 在NSQ 中, diskqueue 是一個實體化的 BackendQueue, 用于保存在記憶體中放不下的訊息,使用場景如Topic 佇列中的訊息,Channel 佇列中的訊息
- 實作的功能是一個FIFO的佇列,實作如下功能:
- 支持訊息的插入、清空、洗掉、關閉操作
- 可以回傳佇列的長度(寫和讀偏移的距離)
- 具有讀寫功能,FIFO 的佇列
diskqueue 的實作
BackendQueue 介面如下:
type BackendQueue interface {
Put([]byte) error // 將一條訊息插入到佇列中
ReadChan() chan []byte // 回傳一個無緩沖的chan
Close() error // 佇列關閉
Delete() error // 洗掉佇列 (實際在實作時,資料仍保留)
Depth() int64 // 回傳讀延遲的訊息量
Empty() error // 清空訊息 (實際會洗掉所有的記錄檔案)
}
資料結構
對于需要原子操作的64bit 的欄位,需要放在struct 的最前面,原因請看學習總結第一條
資料結構中定義了 檔案的讀寫位置、一些檔案讀寫的控制變數,以及相關操作的channel.
// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// run-time state (also persisted to disk)
readPos int64 // 讀的位置
writePos int64 // 寫的位置
readFileNum int64 // 讀檔案的編號
writeFileNum int64 // 寫檔案的編號
depth int64 // 讀寫檔案的距離 (用于標識佇列的長度)
sync.RWMutex
// instantiation time metadata
name string // 標識佇列名稱,用于落地檔案名的前綴
dataPath string // 落地檔案的路徑
maxBytesPerFile int64 // 每個檔案最大位元組數
minMsgSize int32 // 單條訊息的最小大小
maxMsgSize int32 // 單挑訊息的最大大小
syncEvery int64 // 每寫多少次刷盤一次
syncTimeout time.Duration // 至少多久會刷盤一次
exitFlag int32 // 退出標識
needSync bool // 如果 needSync 為true, 則需要fsync重繪metadata 資料
// keeps track of the position where we have read
// (but not yet sent over readChan)
nextReadPos int64 // 下一次讀的位置
nextReadFileNum int64 // 下一次讀的檔案number
readFile *os.File // 讀 fd
writeFile *os.File // 寫 fd
reader *bufio.Reader // 讀 buffer
writeBuf bytes.Buffer // 寫 buffer
// exposed via ReadChan()
readChan chan []byte // 讀channel
// internal channels
writeChan chan []byte // 寫 channel
writeResponseChan chan error // 同步寫完之后的 response
emptyChan chan int // 清空檔案的channel
emptyResponseChan chan error // 同步清空檔案后的channel
exitChan chan int // 退出channel
exitSyncChan chan int // 退出命令同步等待channel
logf AppLogFunc // 日志句柄
}
初始化一個佇列
初始化一個佇列,需要定義前綴名, 資料路徑,每個檔案的最大位元組數,訊息最大最小限制,以及刷盤頻次和最長刷盤時間,最后還有一個日志函式
func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
}
// no need to lock here, nothing else could possibly be touching this instance
err := d.retrieveMetaData()
if err != nil && !os.IsNotExist(err) {
d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
}
go d.ioLoop()
return &d
}
可以看出, 佇列中均使用不帶cache 的chan,訊息只能阻塞處理,
d.retrieveMetaData() 是從檔案中恢復元資料,
d.ioLoop() 是佇列的事件處理邏輯,后文詳細解答
訊息的讀寫
檔案格式
檔案名 "name" + .diskqueue.%06d.dat 其中, name 是 topic, 或者topic + channel 命名.
資料采用二進制方式存盤, 訊息大小+ body 的形式存盤,
訊息讀操作
- 如果readFile 檔案描述符未初始化, 則需要先打開相應的檔案,將偏移seek到相應位置,并初始化reader buffer
- 初始化后,首先讀取檔案的大小, 4個位元組,然后通過檔案大小獲取相應的body 資料
- 更改相應的偏移,如果偏移達到檔案最大值,則會關閉相應檔案,讀的檔案編號 + 1
訊息寫操作
- 如果writeFile 檔案描述符未初始化,則需要先打開相應的檔案,將偏移seek到檔案末尾,
- 驗證訊息的大小是否符合要求
- 將body 的大小和body 寫入 buffer 中,并落地
- depth +1,
- 如果檔案大小大于每個檔案的最大大小,則關閉當前檔案,并將寫檔案的編號 + 1
事件回圈 ioLoop
ioLoop 函式,做所有時間處理的操作,包括:
- 訊息讀取
- 寫操作
- 清空佇列資料
- 定時重繪的事件
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte
// 定時器的設定
syncTicker := time.NewTicker(d.syncTimeout)
for {
// 若到達刷盤頻次,標記等待刷盤
if count == d.syncEvery {
d.needSync = true
}
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
count = 0
}
// 有可讀資料,并且當前讀chan的資料已經被讀走,則讀取下一條資料
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
}
}
r = d.readChan
} else {
// 如果無可讀資料,那么設定 r 為nil, 防止將dataRead資料重復傳入readChan中
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
// 如果讀chan 被寫入成功,則會修改讀的偏移
d.moveForward()
case <-d.emptyChan:
// 清空所有檔案,并回傳empty的結果
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
// 寫msg
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
// 到刷盤時間,則修改needSync = true
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}
exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
需要注意的點:
- 資料會預先讀出來,當發送到readChan 里面,才會通過moveForward 操作更改讀的偏移,
- queue 的Put 操作非操作,會等待寫完成后,才會回傳結果
- Empty 操作會清空所有資料
- 資料會定時或者按照設定的同步頻次呼叫FSync 刷盤
metadata 元資料
metadata 檔案格式
檔案名: "name" + .diskqueue.meta.dat 其中, name 是 topic, 或者topic + channel 命名.
metadata 資料包含5個欄位, 內容如下:
depth\nreadFileNum,readPos\nwriteFileNum,writePos
metadata 作用
當服務關閉后,metadata 資料將保存在檔案中,當服務再次啟動時,將從檔案中將相關資料恢復到記憶體中,
學習總結
記憶體對齊與原子操作的問題
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
-
現象 nsq 在定義struct 的時候,很多會出現類似的注釋
-
原因 原因在golang 原始碼 sync/atomic/doc.go 中
// On ARM, x86-32, and 32-bit MIPS, // it is the caller's responsibility to arrange for 64-bit // alignment of 64-bit words accessed atomically. The first word in a // variable or in an allocated struct, array, or slice can be relied upon to be // 64-bit aligned. -
解釋 在arm, 32 x86系統,和 32位 MIPS 指令集中,呼叫者需要保證對64位變數做原子操作時是64位記憶體對齊的(而不是32位對齊),而將64位的變數放在struct, array, slice 的最前面,可以保證64位對齊
-
結論 有64bit 原子操作的變數,會定義在struct 的最前面,可以使變數使64位對齊,保證程式在32位系統中正確執行
物件池的使用
buffer_pool.go檔案中, 簡單實作了 bytes.Buffer 的物件池,減少了gc 壓力- 使用場景,需要高頻次做物件初始化和記憶體分配的情況,可使用sync.Pool 物件池減少gc 壓力
如何將作業系統快取中的資料主動重繪到硬碟中?
- fsync 函式 (在write 函式之后,需要使用fsync 才能確保資料落盤)
本文代碼來自于
github.com/nsqio/go-diskqueue

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/10501.html
標籤:其他
下一篇:前端小白的學習之路--git學習
