主頁 >  其他 > NSQ原始碼剖析(一):NSQD主要結構方法和訊息生產消費程序

NSQ原始碼剖析(一):NSQD主要結構方法和訊息生產消費程序

2020-09-12 23:11:25 其他

目錄
  • 1 概述
  • 2 主要結構體及方法
    • 2.1 NSQD
    • 2.2 tcpServer
    • 2.3 protocolV2
    • 2.4 clientV2
    • 2.5 Topic
    • 2.6 channel
  • 3 啟動程序
  • 4 消費和生產程序
    • 4.1 訊息生產
    • 4.2 訊息消費
    • 4.2 延遲消費

1 概述

NSQ包含3個組件:

  • nsqd:每個nsq實體運行一個nsqd行程,負責接收生產者訊息、向nsqlookupd注冊、向消費者推送訊息
  • nsqlookupd:集群注冊中心,可以有多個,負責接收nsqd的注冊資訊,向消費者提供服務發現
  • nsqadmin:用于監控和管理的web ui

生產者將訊息寫入到指定的主題Topic,同一個Topic下則可以關聯多個管道Channel,每個Channel都會傳輸對應Topic的完整副本,
消費者則訂閱Channel的訊息,于是多個消費者訂閱不同的Channel的話,他們各自都能拿到完整的訊息副本;但如果多個消費者訂閱同一個Channel,則是共享的,即訊息會隨機發送給其中一個消費者,

接下來我們來分析下nsq的原始碼:

  • 原始碼地址:https://github.com/nsqio/nsq
  • 2020年1月18日最新的master分支

nsq各組件均使用上述代碼倉庫,通過apps目錄下的不同的main包啟動,比如nsqd的main函式在apps/nsqd目錄下,其他類同,

本檔案主要分析nsqd的主要結構體和方法,及訊息生產和消費的程序,主要以TCP api為例來分析,HTTP/HTTPS的api類同,

2 主要結構體及方法

2.1 NSQD

nsqd/nsqd.go檔案,NSQD是主實體,一個nsqd行程創建一個nsqd結構體實體,并通過此結構體的Main()方法啟動所有的服務,

type NSQD struct {
	clientIDSequence int64 // 遞增的客戶端ID,每個客戶端連接均從這里取一個遞增后的ID作為唯一標識

	sync.RWMutex

	opts atomic.Value // 引數選項,真實型別是apps/nsqd/option.go:Options結構體

	dl        *dirlock.DirLock
	isLoading int32
	errValue  atomic.Value
	startTime time.Time

	topicMap map[string]*Topic	// 保存當前所有的topic

	clientLock sync.RWMutex
	clients    map[int64]Client

	lookupPeers atomic.Value

	tcpServer     *tcpServer
	tcpListener   net.Listener
	httpListener  net.Listener
	httpsListener net.Listener
	tlsConfig     *tls.Config

	poolSize int				// 當前作業協程組的協程數量

	notifyChan           chan interface{}
	optsNotificationChan chan struct{}
	exitChan             chan int
	waitGroup            util.WaitGroupWrapper

	ci *clusterinfo.ClusterInfo
}

主要方法

/*
	程式啟動時呼叫本方法,執行下面的動作:
		- 啟動TCP/HTTP/HTTPS服務
		- 啟動作業協程組:NSQD.queueScanLoop
		- 啟動服務注冊:NSQD.lookupLoop
*/
func (n *NSQD) Main() error

// 負責管理作業協程組的數量,每呼叫一次NSQD.queueScanWorker()方法啟動一個作業協程
func (n *NSQD) queueScanLoop()

// 由queueScanLoop()呼叫,負責啟動作業協程組并動態調整協程數量,作業協程的數量為當前的channel數 * 0.25
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int)

// 這是具體的作業協程,監聽workCh,對收到的待處理Channel做兩個動作,一是將超時的訊息重新入隊;二是將到時間的延時訊息入隊
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int)

/*
	lookupLoop()方法與nsqlookupd建立連接,負責向nsqlookupd注冊topic,并定時發送心跳包
*/
func (n *NSQD) lookupLoop()

2.2 tcpServer

nsqd/tcp.go檔案,tcpServer通過Handle()方法接收TCP請求,
tcpServer是nsqd結構的成員,全域也就只有一個實體,但在protocol包的TCPServer方法中,每創建一個新的連接,均會呼叫一次tcpServer.Handle()

type tcpServer struct {
	ctx   *context
	conns sync.Map
}

主要方法

/*
	p.nsqd.Main()啟動protocol.TCPServer(),這個方法里會為每個客戶端連接創建一個新協程,協程執行tcpServer.Handle()方法
	本方法首先對新連接讀取4位元組校驗版本,新連接必須首先發送4位元組"  V2",
	然后阻塞呼叫nsqd.protocolV2.IOLoop()處理客戶端接下來的請求,
*/
func (p *tcpServer) Handle(clientConn net.Conn)

2.3 protocolV2

nsqd/protocol_v2.go檔案,protocolV2負責處理過來的具體的用戶請求,
每個連接均會創建一個獨立的protocolV2實體(由tcpServer.Handle()創建)

type protocolV2 struct {
	ctx *context
}

主要方法

/*
	tcpServer.Handle()阻塞呼叫本方法
	1. 啟用一個獨立協程向消費者推送訊息protocolV2.messagePump()
	2. for回圈接收并處理客戶端請求protocolV2.Exec()
*/
func (p *protocolV2) IOLoop(conn net.Conn) error

// 組裝訊息并呼叫protocolV2.Send()發送給消費者
func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error

// 向客戶端發送資料幀
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error

// 決議客戶端請求的指令,呼叫對應的指令方法
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error)

// 負責向消費者推送訊息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool)

// 下面這組方法是NSQD支持的指令對應的處理方法
func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) CLS(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) NOP(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error)

2.4 clientV2

nsqd/client_v2.go檔案,保存每個客戶端的連接資訊,
clientV2實體由protocolV2.IOLoop()創建,每個連接均有一個獨立的實體,

type clientV2 struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	ReadyCount    int64
	InFlightCount int64
	MessageCount  uint64
	FinishCount   uint64
	RequeueCount  uint64

	pubCounts map[string]uint64

	writeLock sync.RWMutex
	metaLock  sync.RWMutex

	ID        int64
	ctx       *context
	UserAgent string

	// original connection
	net.Conn

	// connections based on negotiated features
	tlsConn     *tls.Conn
	flateWriter *flate.Writer

	// reading/writing interfaces
	Reader *bufio.Reader
	Writer *bufio.Writer

	OutputBufferSize    int
	OutputBufferTimeout time.Duration

	HeartbeatInterval time.Duration

	MsgTimeout time.Duration

	State          int32
	ConnectTime    time.Time
	Channel        *Channel
	ReadyStateChan chan int
	ExitChan       chan int

	ClientID string
	Hostname string

	SampleRate int32

	IdentifyEventChan chan identifyEvent
	SubEventChan      chan *Channel

	TLS     int32
	Snappy  int32
	Deflate int32

	// re-usable buffer for reading the 4-byte lengths off the wire
	lenBuf   [4]byte
	lenSlice []byte

	AuthSecret string
	AuthState  *auth.State
}

2.5 Topic

nsqd/topic.go檔案,對應于每個topic實體,由系統啟動時創建或者發布訊息/消費訊息時自動創建,

type Topic struct {
	messageCount uint64	// 累計訊息數
	messageBytes uint64	// 累計訊息體的位元組數

	sync.RWMutex

	name              string				// topic名,生產和消費時需要指定此名稱
	channelMap        map[string]*Channel	// 保存每個channel name和channel指標的映射
	backend           BackendQueue			// 磁盤佇列,當記憶體memoryMsgChan滿時,寫入硬碟佇列
	memoryMsgChan     chan *Message			// 訊息優先存入這個記憶體chan
	startChan         chan int
	exitChan          chan int
	channelUpdateChan chan int
	waitGroup         util.WaitGroupWrapper
	exitFlag          int32
	idFactory         *guidFactory

	ephemeral      bool
	deleteCallback func(*Topic)
	deleter        sync.Once

	paused    int32
	pauseChan chan int

	ctx *context
}

主要方法

/*
	下面兩個方法負責將訊息寫入topic,底層均呼叫topic.put()方法
	1. topic.memoryMsgChan未滿時,優先寫入記憶體memoryMsgChan
	2. 否則,寫入磁盤topic.backend
*/
func (t *Topic) PutMessage(m *Message) error
func (t *Topic) PutMessages(msgs []*Message) error

/*
	NewTopic創建新的topic時會為每個topic啟動一個獨立執行緒來處理訊息推送,即messagePump()
	此方法回圈隨機從記憶體memoryMsgChan和磁盤佇列backend中取訊息寫入到topic下每一個chnnel中
*/
func (t *Topic) messagePump()

2.6 channel

nsqd/channel.go檔案,對應于每個channel實體

type Channel struct {
	requeueCount uint64
	messageCount uint64
	timeoutCount uint64

	sync.RWMutex

	topicName string
	name      string
	ctx       *context

	backend BackendQueue		// 磁盤佇列,當記憶體memoryMsgChan滿時,寫入硬碟佇列

	memoryMsgChan chan *Message	// 訊息優先存入這個記憶體chan
	exitFlag      int32
	exitMutex     sync.RWMutex

	// state tracking
	clients        map[int64]Consumer
	paused         int32
	ephemeral      bool
	deleteCallback func(*Channel)
	deleter        sync.Once

	// Stats tracking
	e2eProcessingLatencyStream *quantile.Quantile

	// TODO: these can be DRYd up
	deferredMessages map[MessageID]*pqueue.Item	// 保存尚未到時間的延遲消費訊息
	deferredPQ       pqueue.PriorityQueue		// 保存尚未到時間的延遲消費訊息,最小堆
	deferredMutex    sync.Mutex
	inFlightMessages map[MessageID]*Message		// 保存已推送尚未收到FIN的訊息
	inFlightPQ       inFlightPqueue				// 保存已推送尚未收到FIN的訊息,最小堆
	inFlightMutex    sync.Mutex
}

主要方法

/*
	將訊息寫入channel,邏輯與topic的一致,記憶體未滿則優先寫記憶體chan,否則寫入磁盤佇列
*/
func (c *Channel) PutMessage(m *Message) error
func (c *Channel) put(m *Message) error

// 消費超時相關
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
func (c *Channel) pushInFlightMessage(msg *Message) error
func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error)
func (c *Channel) addToInFlightPQ(msg *Message)
func (c *Channel) removeFromInFlightPQ(msg *Message)
func (c *Channel) processInFlightQueue(t int64) bool

// 延時消費相關
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error
func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error)
func (c *Channel) addToDeferredPQ(item *pqueue.Item)
func (c *Channel) processDeferredQueue(t int64) bool

3 啟動程序

nsqd的main函式在apps/nsqd/main.go檔案,
啟動時呼叫了一個第三方包svc,主要作用是攔截syscall.SIGINT/syscall.SIGTERM這兩個信號,最侄訓是呼叫了main.go下的3個方法:

  • program.Init():windows下特殊操作
  • program.Start():加載引數和組態檔、加載上一次保存的Topic資訊并完成初始化、創建nsqd并呼叫p.nsqd.Main()啟動
  • program.Stop():退出處理

p.nsqd.Main()的邏輯也很簡單,代碼不貼了,依次啟動了TCP服務、HTTP服務、HTTPS服務這3個服務,除此之外,還啟動了以下兩個協程:

  • queueScanLoop:訊息延時/超時處理
  • lookupLoop:服務注冊

TCPServer
protocol包的TCPServer的核心代碼就是下面這幾行,回圈等待客戶端連接,并為每個連接創建一個獨立的協程:

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
	for {
		// 等待生產者或消費者連接
		clientConn, err := listener.Accept()

		// 每創建一個連接wg +1
		wg.Add(1)
		go func() {
			// 每個連接均啟動一個獨立的協程來接收處理請求
			handler.Handle(clientConn)
			wg.Done()
		}()
	}

	// 等待所有協程退出
	wg.Wait()

	return nil
}

TCPServer的核心是為每個連接啟動的協程處理方法handler.Handle(clientConn),實際呼叫的是下面這個方法,連接建立時先讀取4位元組,必須是" V2",然后啟動prot.IOLoop(clientConn)處理接下來的客戶端請求:

func (p *tcpServer) Handle(clientConn net.Conn) {
	// 無論是生產者還是消費者,建立連接時,必須先發送4位元組的"  V2"進行版本校驗
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	protocolMagic := string(buf)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		prot = &protocolV2{ctx: p.ctx}
	default:
		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		clientConn.Close()
		p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
			clientConn.RemoteAddr(), protocolMagic)
		return
	}

	// 版本校驗通過,保存連接資訊,key-是ADDR,value是當前連接指標
	p.conns.Store(clientConn.RemoteAddr(), clientConn)

	// 啟動
	err = prot.IOLoop(clientConn)
	if err != nil {
		p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
	}

	p.conns.Delete(clientConn.RemoteAddr())
}

4 消費和生產程序

4.1 訊息生產

生產者pub訊息時,訊息會首先寫入對應topic的佇列(記憶體優先,記憶體滿了寫磁盤),topic的messagePump()方法再將訊息拷貝給每個channel,
每個channel均各執一份完整的訊息,

1.訊息寫入topic
訊息生產由生產者呼叫PUB/MPUB/DPUB這類指令實作,底層都是呼叫topic.PutMessage(msg),進一步呼叫topic.put(msg):

func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:	// 優先寫入記憶體memoryMsgChan
	default:					// 當記憶體case失敗即memoryMsgChan滿時,走default,將msg以位元組形式寫入磁盤佇列topic.backend
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(LOG_ERROR,
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}

訊息寫入topic的邏輯比較簡單,優先寫memoryMsgChan,如果memoryMsgChan滿了,則寫入磁盤佇列topic.backend,
這里留個思考題:NSQ是否支持不寫記憶體,全部寫磁盤佇列?

2.topic將訊息復制給每個channel
第二章介紹結構體和方法時,介紹了topic結構體的messagePump()方法,正是這個方法將第1步寫入的訊息復制給每個channel的:

func (t *Topic) messagePump() {
	/* 準備作業有代碼我們略過 */
	// 主訊息處理回圈
	for {
		select {
		case msg = <-memoryMsgChan:
		case buf = <-backendChan:
			msg, err = decodeMessage(buf)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
		case <-t.channelUpdateChan:
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.pauseChan:
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.exitChan:
			goto exit
		}

		for i, channel := range chans {
			chanMsg := msg
			/* channel消費訊息時,需要處理延時/超時等問題,所以這里復制了訊息,給每個channel傳遞的是獨立的訊息實體 */
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}
}

topic.messagePump()方法代碼還蠻長的,前面是些準備作業,主要就是后面的for回圈,其中for回圈中select的前兩項,memoryMsgChan來源于topic.memoryMsgChan,而backendChan則是topic.backend.ReadChan(),分別對應于記憶體和磁盤佇列,注意只有這兩個case會往下傳遞訊息,其他的case處理退出和更新機制的,會continue或exit外層的for回圈,
雖然通道channel是有序的,但select的case具有隨機性,這就決定了每輪回圈讀的是記憶體還是磁盤是隨機的,訊息的消費順序是不可控的,
select陳述句獲取的訊息,交給第2層for回圈處理,邏輯比較簡單,遍歷每一個chan,呼叫channel.PutMessage()寫入,由于每個channel對應于不同的消費者,有不同的延時/超時和消費機制,所以這里拷貝了message實體,

4.2 訊息消費

每個連接均會啟動一個運行protocolV2.messagePump()方法的協程,這個協程負責監聽channel的訊息佇列并向客戶端推送訊息,客戶端只有觸發SUB指令之后,才會將channel傳遞給protocolV2.messagePump(),這之后消費推送才會正式開啟,
啟動訊息推送
前面講Tcpserver時有提到,客戶端創建連接時,會呼叫tcpserver.Handle(),里面再呼叫protocolV2.IOLoop(),protocolV2.IOLoop()方法開頭有下面這行:

	go p.messagePump(client, messagePumpStartedChan)

這行創建了一個獨立執行緒,呼叫的protocolV2.messagePump()負責向消費者推送訊息,
有個小細節是無論是生產者還是消費者,都會創建這個協程,protocolV2.messagePump()創建后并不會立即推送訊息,而是需要呼叫SUB指令,以protocolV2.SUB()方法為例,方法末尾有這么一行:

	client.SubEventChan <- channel

將當前消費者訂閱的channel傳入client.SubEventChan,這個會由protocolV2.messagePump()接收,這個方法核心是下面這個for回圈(限于篇幅,我省略了大量無關代碼):

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	for {
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			memoryMsgChan = nil
			backendMsgChan = nil
			flusherChan = nil
			// force flush
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		} else if flushed {
			// last iteration we flushed...
			// do not select on the flusher ticker channel
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		}
		
		select {
		case subChannel = <-subEventChan:
			// you can't SUB anymore
			subEventChan = nil
		case b := <-backendMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}

			msg, err := decodeMessage(b)
			if err != nil {
				p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}
}

客戶端建立連接初始,subChannel為空,回圈一直走第1個if陳述句,直到客戶端呼叫SUB指令,select陳述句執行"case subChannel = <-subEventChan:",此時subChannel非空,接下來backendMsgChan和memoryMsgChan被賦值,此后開始推送訊息:

  • 訊息會隨機從記憶體和磁盤佇列取,因為如果記憶體和磁盤都有資料,select是隨機的
  • 訊息通過protocolV2.SendMessage()推送給消費者

當多個消費者訂閱同一個channel時情況會如何?
上面我們提到消費者發起SUB指令訂閱訊息,protocolV2.SUB()會將chan傳給protocolV2.messagePump(),即這一行“client.SubEventChan <- channel”,那么我們來看下這個channel變數怎么來的:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
	...
	
	topic := p.ctx.nsqd.GetTopic(topicName)
	channel = topic.GetChannel(channelName)
	
	...
	client.SubEventChan <- channel

SUB方法包含多種邏輯:

  • 當channel不存在時,topic.GetChannel()方法自動創建并與這個消費者系結
  • 當channel存在,比如事先通過http-api創建好了,但沒有消費者訂閱,則當前消費者獨立系結這個channel
  • 當channel存在,且已經有消費者訂閱了,topic.GetChannel()方法依然會回傳這個channel,這時就有多個消費者同時訂閱了這個channel,大家共用一個通道chan變數

由于是多個消費者共用一個通道chan變數,每個消費者都有一個for select在回圈監聽這個通道,根據chan變數的特性,消費會隨機發送給一位消費者,且一條訊息只會推送給一個消費者,

消費超時處理
protocolV2.messagePump()方法,無論是“case b := <-backendMsgChan:”還是“case msg := <-memoryMsgChan:”,在向消費者推送訊息前都呼叫了下面這行代碼:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 省略其他代碼
}

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
	msg.pri = now.Add(timeout).UnixNano() // pri成員保存本訊息超時時間
	err := c.pushInFlightMessage(msg)
	c.addToInFlightPQ(msg)
}

channel.StartInFlightTimeout()將訊息保存到channel的inFlightMessages和inFlightPQ佇列中,這兩個快取是用來處理消費超時的,
值得注意的一個小細節是c.addToInFlightPQ(msg)將msg壓入最小堆時,將msg在陣列的偏移量保存到了msg.index成員中(最小堆底層是陣列實作)

我們先簡單看下FIN指令會做啥:

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
	err = client.Channel.FinishMessage(client.ID, *id) // 省略其他代碼
}

func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
	// 省略其他代碼
	msg, err := c.popInFlightMessage(clientID, id)

	c.removeFromInFlightPQ(msg)
}

FIN的動作比較簡單,主要就是呼叫channel.FinishMessage()方法把上面寫入超時快取的msg給洗掉掉,

FIN從inFlightMessages中洗掉訊息比較容易,這是個map,key是msg.id,客戶端發送FIN訊息時附帶了msg.id,但如何從最小堆inFlightPQ中洗掉對應的msg呢?前面提到在入堆時的一個細節,即保存了msg的偏移量,此時正好用上,通過msg.index直接定位到msg的位置并調整堆即可,

說了這么多,最小堆的作用是啥?別急,接下來我們看下超時邏輯:
超時邏輯由程式啟動時開啟的作業執行緒組來處理,即NSQD.queueScanLoop()方法:

func (n *NSQD) queueScanLoop() {
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C: // 定時觸發作業
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C: // 動態調整協程組的數量
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i] // 觸發協程組作業
		}

		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}

		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}
}

NSQD.queueScanLoop()方法主要有一個for回圈,內層是一個select和一個loop回圈,select中,第1個定時器case <-workTicker.C的作用是定時觸發作業,只有這個case會跳出select走到下面的loop,第2個定時器負責啟動作業協程組并動態調整協程數量,我們來看下第2個定時器呼叫的resizePool()方法:

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	idealPoolSize := int(float64(num) * 0.25) // 協程數量設定為channel數的1/4
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		if idealPoolSize == n.poolSize {		// 當協程數量達到協程數量設定為channel數的1/4時,退出
			break
		} else if idealPoolSize < n.poolSize {	// 否則如果當前協程數大于目標值,則通過closeCh通知部分協程退出
			// contract
			closeCh <- 1
			n.poolSize--
		} else {								// 否則協程數不夠,則啟動新的協程
			// expand
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

resizePool()方法上面的注釋已經說的很清楚了,作用就是保持作業協程數量為當前channel數的1/4,

接下來我們看具體的作業邏輯,queueScanWorker()方法:

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	for {
		select {
		case c := <-workCh:
			now := time.Now().UnixNano()
			dirty := false
			if c.processInFlightQueue(now) {
				dirty = true
			}
			if c.processDeferredQueue(now) {
				dirty = true
			}
			responseCh <- dirty
		case <-closeCh:
			return
		}
	}
}

queueScanWorker()方法的代碼很短,一是監聽closeCh的退出信號;二是監聽workCh的作業信號,workCh會將需要處理的channel傳入,然后呼叫processInFlightQueue()清理超時的訊息,呼叫processDeferredQueue()清理到時間的延時訊息:

func (c *Channel) processInFlightQueue(t int64) bool {
	dirty := false
	for {
		c.inFlightMutex.Lock()
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			goto exit
		}
		dirty = true

		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		atomic.AddUint64(&c.timeoutCount, 1)
		c.RLock()
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			client.TimedOutMessage()
		}
		c.put(msg)
	}

exit:
	return dirty
}

func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
	if len(*pq) == 0 {
		return nil, 0
	}

	x := (*pq)[0]
	if x.pri > max {
		return nil, x.pri - max
	}
	pq.Pop()

	return x, 0
}

前面提到msg.pri成員保存本訊息超時時間,所以PeekAndShift()回傳的是最小堆里已經超時且超時時間最長的那條訊息,processInFlightQueue()則將訊息從超時佇列中刪,同時將訊息重新put進channel,注意此時超時的訊息put進channel后實際是排在隊尾的,消費順序將發生改變,
processInFlightQueue()方法如果存在超時訊息,回傳值dirty標識true,queueScanWorker()將dirty寫入responseCh,再往回看,queueScanLoop()方法統計了dirty的數量,超過一定比例會繼續執行loop,而不是等待下一次定時執行,

4.2 延遲消費

生產者呼叫DPUB發布的訊息,可以指定延時多少再推送給消費者,
我們來看下DPUB的邏輯:

func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
	timeoutMs, err := protocol.ByteToBase10(params[2])
	timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
	msg := NewMessage(topic.GenerateID(), messageBody)
	msg.deferred = timeoutDuration
	err = topic.PutMessage(msg)
}

從上面截取的PUB()方法代碼可以看出,DPUB的訊息會將延時時間寫入msg.deferred成員,4.1章節第2部分介紹的Topic.messagePump()方法有下面這段:

func (t *Topic) messagePump() {
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
}

當chanMsg.deferred != 0時表示延時訊息,此時不是直接呼叫putMessage()方法寫入channel,而是呼叫channel.PutMessageDeferred(chanMsg, chanMsg.deferred),訊息被寫入了延時佇列Channel.deferredMessages和Channel.deferredPQ,之后的邏輯是在作業協程組NSQD.queueScanLoop()中被識別并put進channel,這與超時的處理邏輯是一樣的,不展開說,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/17771.html

標籤:其他

上一篇:GStreamer基礎教程13 - 除錯Pipeline

下一篇:qt creator原始碼全方面分析(0)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more