nsqlookupd 用于Topic, Channel, Node 三類資訊的一致性分發
概要
nsqlookup 知識點總結
-
功能定位
- 為node 節點和客戶端節點提供一致的topic, channel, node 查詢服務
- Topic 主題, 和大部分訊息佇列的含義一致, 訊息處理時,將相同主題的資料會歸為一類訊息
- channel,可以理解為 topic 的一份資料拷貝,一個或者多個消費者對接一個channel,
- node nsqd 啟動的一個實體
- 一個channel會放置在某一個node 節點上,一個topic 下可以有多個channel.
- HTTP 介面 用于客戶端服務發現以及admin 的交戶使用
- TCP 介面 用于 node 節點做訊息廣告使用
- 為node 節點和客戶端節點提供一致的topic, channel, node 查詢服務
-
實作方式
- 資料包括了Topic, Channel, Node 等資訊,全部存盤于RegistrationDB中,RegistrationDB 采用讀寫鎖和 map 實作,資料均存盤于記憶體中
- 若存在多個nsqlookup 節點,各節點之間無耦合關系
nsqlookupd 原始碼閱讀
程式入口檔案: /apps/nsqlookupd/main.go
為了時NSQ 在windows 良好運行,NSQ 使用了 github.com/judwhite/go-svc/svc 包,用于構建一個可實作windows 服務, 可以用windows 的服務管理插件直接管理,
svc 包使用時,只需要實作 github.com/judwhite/go-svc/svc.Service 的介面即可,介面如下:
type Service interface {
// Init is called before the program/service is started and after it's
// determined if the program is running as a Windows Service.
Init(Environment) error
// Start is called after Init. This method must be non-blocking.
Start() error
// Stop is called in response to os.Interrupt, os.Kill, or when a
// Windows Service is stopped.
Stop() error
}
因此,nsqlookup 只需要實作上述三個方法即可:
Init 方法
此方法僅針對windows 的服務做了處理,若為windows 服務,則修改當前目錄為可執行檔案的目錄,
Stop 方法
此方法做了nsqlookupd.Exit() 的處理,
此處用到了sync.Once. 即呼叫的退出程式僅執行一次,
Exit 的具體內容為:
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()
}
- 關閉 TCP Listener
- 關閉 Http Listener
- 等待所有goroutine的退出 (此處用到了sync.WaitGroup,用于等待goroutine 的退出)
Start 方法
引數的初始化
NSQ 命令列引數的構造,采用了golang 自帶的flag 包,引數保存于Options物件中,采用了先初始化,后賦值的方式,減少了不必要的條件判斷,
可以采用--config 的方式,直接添加組態檔,組態檔采用toml格式.
配置的決議,采用github.com/mreiferson/go-options 實作,優先級由高到低為:
- 命令列引數
- deprecated 的命令列引數名稱
- 組態檔的值 (將命令列引數,連字符替換為下劃線作為組態檔的key)
- 若引數實作了Getter,則使用Get() 方法
- 引數默認值
構造nsqlookupd
- 初始化一個RegistrationDB
- 建立 HttpListener 和 tcpListener (客戶端請求)
- 啟動服務,等待連接請求或者中斷信號
RegistrationMap 的實作
// RegistrationDB 使用讀寫鎖做讀寫控制,
type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]ProducerMap
}
type Registration struct {
Category string // Category 有三種型別,Topic, Channel, Client.
Key string
SubKey string
}
type ProducerMap map[string]*Producer
type Producer struct {
peerInfo *PeerInfo //客戶端的相關資訊
tombstoned bool
tombstonedAt time.Time
}
type PeerInfo struct {
lastUpdate int64 // 上次更新的時間
id string // 使用ip標識的id
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}
介面閱讀
TcpListener
tcp 訊息是 nsqd 與nsqlookupd 溝通的協議, node 保存的是nsqd 的資訊
Tcp Listener 是用來監聽客戶端發來的TCP 訊息,
建立連接后,發送4個byte標識連接的版本號,目前是v1. "__V1" (下劃線用空格替代)
訊息之間按斬訓行符\n分割,
目前客戶端支持4類訊息:
- PING
- 回傳OK
- 若存在對端的資訊,則更新client.peerInfo.lastUpdate <上次更新時間>
- IDENTIFY
- 用于訊息的認證,將nsqd資訊發送給nsqlookupd.
- 訊息格式
IDENTIFY\nBODYLEN(32bit)BODY|8bit |1 bit | 32bit | N bit | |IDENTIFY| 換行 | body 長度 | body | - BODY 為json格式
- 包含了如下欄位:
- 廣播地址
- TCP 埠
- HTTP 埠
- 版本號
- 服務器地址 (通過連接直接獲取)
- REGISTER
- 將nsqd 中注冊的topic 和channel 資訊發送到nsqlookupd 上,做資訊共享
- UNREGISTER
- 將nsqd 中注銷的topic 和channel 資訊發送到nsqlookupd 上,做資訊共享
HTTPListener
** http 客戶端的定位是用于服務的發現和admin的互動 **
- 在學習 http 請求時,可以先學習下
nsq/internal/http_api包,此包是對golang 中http請求handler 的一次封裝:
type Decorator func(APIHandler) APIHandler
type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
// f 是業務處理邏輯, ds 可以自定義多個包裝器,用于對f 的輸入和輸出資料做處理,
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}
// Decorator 的一個例子,做日志記錄的處理
func Log(logf lg.AppLogFunc) Decorator {
return func(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
start := time.Now()
response, err := f(w, req, ps)
elapsed := time.Since(start)
status := 200
if e, ok := err.(Err); ok {
status = e.Code
}
logf(lg.INFO, "%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
return response, err
}
}
}
這種處理方式類似于大部分web框架HTTP 中間件的處理方式,是利用遞回嵌套的方式,保留了處理的背景關系, 實作服務切片編程,
-
http 服務,使用
github.com/julienschmidt/httprouter包實作http 的路由功能, -
目前HTTP 客戶端支持以下的請求:
| Method | Router | Param | Response |
|---|---|---|---|
| GET | /ping | - | "OK" |
| GET | /info | - | 回傳版本資訊 |
| GET | /debug | - | 回傳 db 中所有資訊 |
| GET | /lookup | topic | 回傳topic 關聯的所有的channels 和 nsqd 服務的資訊 |
| GET | /topics | - | 回傳所有topic 的值 |
| GET | /channels | topic | 回傳topic 下所有的channels 資訊 |
| GET | /nodes | - | 回傳所有在線的nsqd 的node 資訊, node 節點中包含了 topic 的資訊,以及是否需要被洗掉 |
| POST | /topic/create | topic | 創建topic <不超過64個字符長度> |
| POST | /topic/delete | topic | 洗掉相應topic 的channel 和topic 資訊 |
| POST | /channel/create | topic, channel | 創建 channel , 若topic 不存在,創建topic |
| POST | /channel/delete | topic, channel | 洗掉 channel, 支持 * |
| POST | /topic/tombstone | topic, node | 將topic 下某個node 設定洗掉標識 tombstone, 給node 節點 一段空余時間用于洗掉相關topic 資訊,并發送洗掉topic的命令 |
| GET | /debug/pprof | - | pprof 提供的資訊 |
| GET | /debug/pprof/cmdline | - | pprof 提供的資訊 |
| GET | /debug/pprof/symbol | - | pprof 提供的資訊 |
| POST | /debug/pprof/symbol | - | pprof 提供的資訊 |
| GET | /debug/pprof/profile | - | pprof 提供的資訊 |
| GET | /debug/pprof/heap | - | pprof 提供的資訊 |
| GET | /debug/pprof/goroutine | - | pprof 提供的資訊 |
| GET | /debug/pprof/block | - | pprof 提供的資訊 |
| GET | /debug/pprof/threadcreate | - | pprof 提供的資訊 |
學習總結
- sync.Once, sync.RWMutex 讀寫鎖的使用
- http 包裝函式的簡單實作 nsq/internal/http_api.Decorate
github.com/judwhite/go-svc/svc的使用github.com/julienschmidt/httprouter的使用github.com/mreiferson/go-options的使用

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/10498.html
標籤:其他
