目錄
- 1 概述
- 2 主要結構體及方法
- 2.1 NSQD
- 2.2 tcpServer
- 2.3 protocolV2
- 2.4 clientV2
- 2.5 Topic
- 2.6 channel
- 3 啟動程序
- 4 消費和生產程序
- 4.1 訊息生產
- 4.2 訊息消費
- 4.2 延遲消費
1 概述
NSQ包含3個組件:
- nsqd:每個nsq實體運行一個nsqd行程,負責接收生產者訊息、向nsqlookupd注冊、向消費者推送訊息
- nsqlookupd:集群注冊中心,可以有多個,負責接收nsqd的注冊資訊,向消費者提供服務發現
- nsqadmin:用于監控和管理的web ui
生產者將訊息寫入到指定的主題Topic,同一個Topic下則可以關聯多個管道Channel,每個Channel都會傳輸對應Topic的完整副本,
消費者則訂閱Channel的訊息,于是多個消費者訂閱不同的Channel的話,他們各自都能拿到完整的訊息副本;但如果多個消費者訂閱同一個Channel,則是共享的,即訊息會隨機發送給其中一個消費者,
接下來我們來分析下nsq的原始碼:
- 原始碼地址:https://github.com/nsqio/nsq
- 2020年1月18日最新的master分支
nsq各組件均使用上述代碼倉庫,通過apps目錄下的不同的main包啟動,比如nsqd的main函式在apps/nsqd目錄下,其他類同,
本檔案主要分析nsqd的主要結構體和方法,及訊息生產和消費的程序,主要以TCP api為例來分析,HTTP/HTTPS的api類同,
2 主要結構體及方法
2.1 NSQD
nsqd/nsqd.go檔案,NSQD是主實體,一個nsqd行程創建一個nsqd結構體實體,并通過此結構體的Main()方法啟動所有的服務,
type NSQD struct {
clientIDSequence int64 // 遞增的客戶端ID,每個客戶端連接均從這里取一個遞增后的ID作為唯一標識
sync.RWMutex
opts atomic.Value // 引數選項,真實型別是apps/nsqd/option.go:Options結構體
dl *dirlock.DirLock
isLoading int32
errValue atomic.Value
startTime time.Time
topicMap map[string]*Topic // 保存當前所有的topic
clientLock sync.RWMutex
clients map[int64]Client
lookupPeers atomic.Value
tcpServer *tcpServer
tcpListener net.Listener
httpListener net.Listener
httpsListener net.Listener
tlsConfig *tls.Config
poolSize int // 當前作業協程組的協程數量
notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
waitGroup util.WaitGroupWrapper
ci *clusterinfo.ClusterInfo
}
主要方法
/*
程式啟動時呼叫本方法,執行下面的動作:
- 啟動TCP/HTTP/HTTPS服務
- 啟動作業協程組:NSQD.queueScanLoop
- 啟動服務注冊:NSQD.lookupLoop
*/
func (n *NSQD) Main() error
// 負責管理作業協程組的數量,每呼叫一次NSQD.queueScanWorker()方法啟動一個作業協程
func (n *NSQD) queueScanLoop()
// 由queueScanLoop()呼叫,負責啟動作業協程組并動態調整協程數量,作業協程的數量為當前的channel數 * 0.25
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int)
// 這是具體的作業協程,監聽workCh,對收到的待處理Channel做兩個動作,一是將超時的訊息重新入隊;二是將到時間的延時訊息入隊
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int)
/*
lookupLoop()方法與nsqlookupd建立連接,負責向nsqlookupd注冊topic,并定時發送心跳包
*/
func (n *NSQD) lookupLoop()
2.2 tcpServer
nsqd/tcp.go檔案,tcpServer通過Handle()方法接收TCP請求,
tcpServer是nsqd結構的成員,全域也就只有一個實體,但在protocol包的TCPServer方法中,每創建一個新的連接,均會呼叫一次tcpServer.Handle()
type tcpServer struct {
ctx *context
conns sync.Map
}
主要方法
/*
p.nsqd.Main()啟動protocol.TCPServer(),這個方法里會為每個客戶端連接創建一個新協程,協程執行tcpServer.Handle()方法
本方法首先對新連接讀取4位元組校驗版本,新連接必須首先發送4位元組" V2",
然后阻塞呼叫nsqd.protocolV2.IOLoop()處理客戶端接下來的請求,
*/
func (p *tcpServer) Handle(clientConn net.Conn)
2.3 protocolV2
nsqd/protocol_v2.go檔案,protocolV2負責處理過來的具體的用戶請求,
每個連接均會創建一個獨立的protocolV2實體(由tcpServer.Handle()創建)
type protocolV2 struct {
ctx *context
}
主要方法
/*
tcpServer.Handle()阻塞呼叫本方法
1. 啟用一個獨立協程向消費者推送訊息protocolV2.messagePump()
2. for回圈接收并處理客戶端請求protocolV2.Exec()
*/
func (p *protocolV2) IOLoop(conn net.Conn) error
// 組裝訊息并呼叫protocolV2.Send()發送給消費者
func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error
// 向客戶端發送資料幀
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error
// 決議客戶端請求的指令,呼叫對應的指令方法
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error)
// 負責向消費者推送訊息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool)
// 下面這組方法是NSQD支持的指令對應的處理方法
func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) CLS(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) NOP(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error)
2.4 clientV2
nsqd/client_v2.go檔案,保存每個客戶端的連接資訊,
clientV2實體由protocolV2.IOLoop()創建,每個連接均有一個獨立的實體,
type clientV2 struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
ReadyCount int64
InFlightCount int64
MessageCount uint64
FinishCount uint64
RequeueCount uint64
pubCounts map[string]uint64
writeLock sync.RWMutex
metaLock sync.RWMutex
ID int64
ctx *context
UserAgent string
// original connection
net.Conn
// connections based on negotiated features
tlsConn *tls.Conn
flateWriter *flate.Writer
// reading/writing interfaces
Reader *bufio.Reader
Writer *bufio.Writer
OutputBufferSize int
OutputBufferTimeout time.Duration
HeartbeatInterval time.Duration
MsgTimeout time.Duration
State int32
ConnectTime time.Time
Channel *Channel
ReadyStateChan chan int
ExitChan chan int
ClientID string
Hostname string
SampleRate int32
IdentifyEventChan chan identifyEvent
SubEventChan chan *Channel
TLS int32
Snappy int32
Deflate int32
// re-usable buffer for reading the 4-byte lengths off the wire
lenBuf [4]byte
lenSlice []byte
AuthSecret string
AuthState *auth.State
}
2.5 Topic
nsqd/topic.go檔案,對應于每個topic實體,由系統啟動時創建或者發布訊息/消費訊息時自動創建,
type Topic struct {
messageCount uint64 // 累計訊息數
messageBytes uint64 // 累計訊息體的位元組數
sync.RWMutex
name string // topic名,生產和消費時需要指定此名稱
channelMap map[string]*Channel // 保存每個channel name和channel指標的映射
backend BackendQueue // 磁盤佇列,當記憶體memoryMsgChan滿時,寫入硬碟佇列
memoryMsgChan chan *Message // 訊息優先存入這個記憶體chan
startChan chan int
exitChan chan int
channelUpdateChan chan int
waitGroup util.WaitGroupWrapper
exitFlag int32
idFactory *guidFactory
ephemeral bool
deleteCallback func(*Topic)
deleter sync.Once
paused int32
pauseChan chan int
ctx *context
}
主要方法
/*
下面兩個方法負責將訊息寫入topic,底層均呼叫topic.put()方法
1. topic.memoryMsgChan未滿時,優先寫入記憶體memoryMsgChan
2. 否則,寫入磁盤topic.backend
*/
func (t *Topic) PutMessage(m *Message) error
func (t *Topic) PutMessages(msgs []*Message) error
/*
NewTopic創建新的topic時會為每個topic啟動一個獨立執行緒來處理訊息推送,即messagePump()
此方法回圈隨機從記憶體memoryMsgChan和磁盤佇列backend中取訊息寫入到topic下每一個chnnel中
*/
func (t *Topic) messagePump()
2.6 channel
nsqd/channel.go檔案,對應于每個channel實體
type Channel struct {
requeueCount uint64
messageCount uint64
timeoutCount uint64
sync.RWMutex
topicName string
name string
ctx *context
backend BackendQueue // 磁盤佇列,當記憶體memoryMsgChan滿時,寫入硬碟佇列
memoryMsgChan chan *Message // 訊息優先存入這個記憶體chan
exitFlag int32
exitMutex sync.RWMutex
// state tracking
clients map[int64]Consumer
paused int32
ephemeral bool
deleteCallback func(*Channel)
deleter sync.Once
// Stats tracking
e2eProcessingLatencyStream *quantile.Quantile
// TODO: these can be DRYd up
deferredMessages map[MessageID]*pqueue.Item // 保存尚未到時間的延遲消費訊息
deferredPQ pqueue.PriorityQueue // 保存尚未到時間的延遲消費訊息,最小堆
deferredMutex sync.Mutex
inFlightMessages map[MessageID]*Message // 保存已推送尚未收到FIN的訊息
inFlightPQ inFlightPqueue // 保存已推送尚未收到FIN的訊息,最小堆
inFlightMutex sync.Mutex
}
主要方法
/*
將訊息寫入channel,邏輯與topic的一致,記憶體未滿則優先寫記憶體chan,否則寫入磁盤佇列
*/
func (c *Channel) PutMessage(m *Message) error
func (c *Channel) put(m *Message) error
// 消費超時相關
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
func (c *Channel) pushInFlightMessage(msg *Message) error
func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error)
func (c *Channel) addToInFlightPQ(msg *Message)
func (c *Channel) removeFromInFlightPQ(msg *Message)
func (c *Channel) processInFlightQueue(t int64) bool
// 延時消費相關
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error
func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error)
func (c *Channel) addToDeferredPQ(item *pqueue.Item)
func (c *Channel) processDeferredQueue(t int64) bool
3 啟動程序
nsqd的main函式在apps/nsqd/main.go檔案,
啟動時呼叫了一個第三方包svc,主要作用是攔截syscall.SIGINT/syscall.SIGTERM這兩個信號,最侄訓是呼叫了main.go下的3個方法:
- program.Init():windows下特殊操作
- program.Start():加載引數和組態檔、加載上一次保存的Topic資訊并完成初始化、創建nsqd并呼叫p.nsqd.Main()啟動
- program.Stop():退出處理
p.nsqd.Main()的邏輯也很簡單,代碼不貼了,依次啟動了TCP服務、HTTP服務、HTTPS服務這3個服務,除此之外,還啟動了以下兩個協程:
- queueScanLoop:訊息延時/超時處理
- lookupLoop:服務注冊
TCPServer
protocol包的TCPServer的核心代碼就是下面這幾行,回圈等待客戶端連接,并為每個連接創建一個獨立的協程:
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
for {
// 等待生產者或消費者連接
clientConn, err := listener.Accept()
// 每創建一個連接wg +1
wg.Add(1)
go func() {
// 每個連接均啟動一個獨立的協程來接收處理請求
handler.Handle(clientConn)
wg.Done()
}()
}
// 等待所有協程退出
wg.Wait()
return nil
}
TCPServer的核心是為每個連接啟動的協程處理方法handler.Handle(clientConn),實際呼叫的是下面這個方法,連接建立時先讀取4位元組,必須是" V2",然后啟動prot.IOLoop(clientConn)處理接下來的客戶端請求:
func (p *tcpServer) Handle(clientConn net.Conn) {
// 無論是生產者還是消費者,建立連接時,必須先發送4位元組的" V2"進行版本校驗
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
protocolMagic := string(buf)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
// 版本校驗通過,保存連接資訊,key-是ADDR,value是當前連接指標
p.conns.Store(clientConn.RemoteAddr(), clientConn)
// 啟動
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
}
p.conns.Delete(clientConn.RemoteAddr())
}
4 消費和生產程序
4.1 訊息生產
生產者pub訊息時,訊息會首先寫入對應topic的佇列(記憶體優先,記憶體滿了寫磁盤),topic的messagePump()方法再將訊息拷貝給每個channel,
每個channel均各執一份完整的訊息,
1.訊息寫入topic
訊息生產由生產者呼叫PUB/MPUB/DPUB這類指令實作,底層都是呼叫topic.PutMessage(msg),進一步呼叫topic.put(msg):
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m: // 優先寫入記憶體memoryMsgChan
default: // 當記憶體case失敗即memoryMsgChan滿時,走default,將msg以位元組形式寫入磁盤佇列topic.backend
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
訊息寫入topic的邏輯比較簡單,優先寫memoryMsgChan,如果memoryMsgChan滿了,則寫入磁盤佇列topic.backend,
這里留個思考題:NSQ是否支持不寫記憶體,全部寫磁盤佇列?
2.topic將訊息復制給每個channel
第二章介紹結構體和方法時,介紹了topic結構體的messagePump()方法,正是這個方法將第1步寫入的訊息復制給每個channel的:
func (t *Topic) messagePump() {
/* 準備作業有代碼我們略過 */
// 主訊息處理回圈
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
for i, channel := range chans {
chanMsg := msg
/* channel消費訊息時,需要處理延時/超時等問題,所以這里復制了訊息,給每個channel傳遞的是獨立的訊息實體 */
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
}
topic.messagePump()方法代碼還蠻長的,前面是些準備作業,主要就是后面的for回圈,其中for回圈中select的前兩項,memoryMsgChan來源于topic.memoryMsgChan,而backendChan則是topic.backend.ReadChan(),分別對應于記憶體和磁盤佇列,注意只有這兩個case會往下傳遞訊息,其他的case處理退出和更新機制的,會continue或exit外層的for回圈,
雖然通道channel是有序的,但select的case具有隨機性,這就決定了每輪回圈讀的是記憶體還是磁盤是隨機的,訊息的消費順序是不可控的,
select陳述句獲取的訊息,交給第2層for回圈處理,邏輯比較簡單,遍歷每一個chan,呼叫channel.PutMessage()寫入,由于每個channel對應于不同的消費者,有不同的延時/超時和消費機制,所以這里拷貝了message實體,
4.2 訊息消費
每個連接均會啟動一個運行protocolV2.messagePump()方法的協程,這個協程負責監聽channel的訊息佇列并向客戶端推送訊息,客戶端只有觸發SUB指令之后,才會將channel傳遞給protocolV2.messagePump(),這之后消費推送才會正式開啟,
啟動訊息推送
前面講Tcpserver時有提到,客戶端創建連接時,會呼叫tcpserver.Handle(),里面再呼叫protocolV2.IOLoop(),protocolV2.IOLoop()方法開頭有下面這行:
go p.messagePump(client, messagePumpStartedChan)
這行創建了一個獨立執行緒,呼叫的protocolV2.messagePump()負責向消費者推送訊息,
有個小細節是無論是生產者還是消費者,都會創建這個協程,protocolV2.messagePump()創建后并不會立即推送訊息,而是需要呼叫SUB指令,以protocolV2.SUB()方法為例,方法末尾有這么一行:
client.SubEventChan <- channel
將當前消費者訂閱的channel傳入client.SubEventChan,這個會由protocolV2.messagePump()接收,這個方法核心是下面這個for回圈(限于篇幅,我省略了大量無關代碼):
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
}
select {
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
}
客戶端建立連接初始,subChannel為空,回圈一直走第1個if陳述句,直到客戶端呼叫SUB指令,select陳述句執行"case subChannel = <-subEventChan:",此時subChannel非空,接下來backendMsgChan和memoryMsgChan被賦值,此后開始推送訊息:
- 訊息會隨機從記憶體和磁盤佇列取,因為如果記憶體和磁盤都有資料,select是隨機的
- 訊息通過protocolV2.SendMessage()推送給消費者
當多個消費者訂閱同一個channel時情況會如何?
上面我們提到消費者發起SUB指令訂閱訊息,protocolV2.SUB()會將chan傳給protocolV2.messagePump(),即這一行“client.SubEventChan <- channel”,那么我們來看下這個channel變數怎么來的:
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
...
topic := p.ctx.nsqd.GetTopic(topicName)
channel = topic.GetChannel(channelName)
...
client.SubEventChan <- channel
SUB方法包含多種邏輯:
- 當channel不存在時,topic.GetChannel()方法自動創建并與這個消費者系結
- 當channel存在,比如事先通過http-api創建好了,但沒有消費者訂閱,則當前消費者獨立系結這個channel
- 當channel存在,且已經有消費者訂閱了,topic.GetChannel()方法依然會回傳這個channel,這時就有多個消費者同時訂閱了這個channel,大家共用一個通道chan變數
由于是多個消費者共用一個通道chan變數,每個消費者都有一個for select在回圈監聽這個通道,根據chan變數的特性,消費會隨機發送給一位消費者,且一條訊息只會推送給一個消費者,
消費超時處理
protocolV2.messagePump()方法,無論是“case b := <-backendMsgChan:”還是“case msg := <-memoryMsgChan:”,在向消費者推送訊息前都呼叫了下面這行代碼:
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 省略其他代碼
}
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
msg.pri = now.Add(timeout).UnixNano() // pri成員保存本訊息超時時間
err := c.pushInFlightMessage(msg)
c.addToInFlightPQ(msg)
}
channel.StartInFlightTimeout()將訊息保存到channel的inFlightMessages和inFlightPQ佇列中,這兩個快取是用來處理消費超時的,
值得注意的一個小細節是c.addToInFlightPQ(msg)將msg壓入最小堆時,將msg在陣列的偏移量保存到了msg.index成員中(最小堆底層是陣列實作)
我們先簡單看下FIN指令會做啥:
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
err = client.Channel.FinishMessage(client.ID, *id) // 省略其他代碼
}
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
// 省略其他代碼
msg, err := c.popInFlightMessage(clientID, id)
c.removeFromInFlightPQ(msg)
}
FIN的動作比較簡單,主要就是呼叫channel.FinishMessage()方法把上面寫入超時快取的msg給洗掉掉,
FIN從inFlightMessages中洗掉訊息比較容易,這是個map,key是msg.id,客戶端發送FIN訊息時附帶了msg.id,但如何從最小堆inFlightPQ中洗掉對應的msg呢?前面提到在入堆時的一個細節,即保存了msg的偏移量,此時正好用上,通過msg.index直接定位到msg的位置并調整堆即可,
說了這么多,最小堆的作用是啥?別急,接下來我們看下超時邏輯:
超時邏輯由程式啟動時開啟的作業執行緒組來處理,即NSQD.queueScanLoop()方法:
func (n *NSQD) queueScanLoop() {
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C: // 定時觸發作業
if len(channels) == 0 {
continue
}
case <-refreshTicker.C: // 動態調整協程組的數量
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i] // 觸發協程組作業
}
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
}
NSQD.queueScanLoop()方法主要有一個for回圈,內層是一個select和一個loop回圈,select中,第1個定時器case <-workTicker.C的作用是定時觸發作業,只有這個case會跳出select走到下面的loop,第2個定時器負責啟動作業協程組并動態調整協程數量,我們來看下第2個定時器呼叫的resizePool()方法:
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25) // 協程數量設定為channel數的1/4
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize { // 當協程數量達到協程數量設定為channel數的1/4時,退出
break
} else if idealPoolSize < n.poolSize { // 否則如果當前協程數大于目標值,則通過closeCh通知部分協程退出
// contract
closeCh <- 1
n.poolSize--
} else { // 否則協程數不夠,則啟動新的協程
// expand
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
resizePool()方法上面的注釋已經說的很清楚了,作用就是保持作業協程數量為當前channel數的1/4,
接下來我們看具體的作業邏輯,queueScanWorker()方法:
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}
queueScanWorker()方法的代碼很短,一是監聽closeCh的退出信號;二是監聽workCh的作業信號,workCh會將需要處理的channel傳入,然后呼叫processInFlightQueue()清理超時的訊息,呼叫processDeferredQueue()清理到時間的延時訊息:
func (c *Channel) processInFlightQueue(t int64) bool {
dirty := false
for {
c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
dirty = true
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
c.put(msg)
}
exit:
return dirty
}
func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
if len(*pq) == 0 {
return nil, 0
}
x := (*pq)[0]
if x.pri > max {
return nil, x.pri - max
}
pq.Pop()
return x, 0
}
前面提到msg.pri成員保存本訊息超時時間,所以PeekAndShift()回傳的是最小堆里已經超時且超時時間最長的那條訊息,processInFlightQueue()則將訊息從超時佇列中刪,同時將訊息重新put進channel,注意此時超時的訊息put進channel后實際是排在隊尾的,消費順序將發生改變,
processInFlightQueue()方法如果存在超時訊息,回傳值dirty標識true,queueScanWorker()將dirty寫入responseCh,再往回看,queueScanLoop()方法統計了dirty的數量,超過一定比例會繼續執行loop,而不是等待下一次定時執行,
4.2 延遲消費
生產者呼叫DPUB發布的訊息,可以指定延時多少再推送給消費者,
我們來看下DPUB的邏輯:
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
timeoutMs, err := protocol.ByteToBase10(params[2])
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
msg := NewMessage(topic.GenerateID(), messageBody)
msg.deferred = timeoutDuration
err = topic.PutMessage(msg)
}
從上面截取的PUB()方法代碼可以看出,DPUB的訊息會將延時時間寫入msg.deferred成員,4.1章節第2部分介紹的Topic.messagePump()方法有下面這段:
func (t *Topic) messagePump() {
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
}
當chanMsg.deferred != 0時表示延時訊息,此時不是直接呼叫putMessage()方法寫入channel,而是呼叫channel.PutMessageDeferred(chanMsg, chanMsg.deferred),訊息被寫入了延時佇列Channel.deferredMessages和Channel.deferredPQ,之后的邏輯是在作業協程組NSQD.queueScanLoop()中被識別并put進channel,這與超時的處理邏輯是一樣的,不展開說,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/17771.html
標籤:其他
