pipeline作為網路傳輸的通道之一,通過建立HTTP短連接,主要傳輸資料量大、發送頻率較低的資料,例如快照資料,
結構體
type pipeline struct {
peerID types.ID //該pipeline對應的節點的ID
tr *Transport //關聯rafthttp.Transport實體
picker *urlPicker //用于選擇可用的url
status *peerStatus //節點的狀態
raft Raft//底層raft實體
msgc chan raftpb.Message//pipeline實體從該通道中獲取待發送的訊息
wg sync.WaitGroup//負責同步多個goroutine結束,每個pipeline默認開啟4個goroutine來處理msgc中的訊息,必須先關閉這些goroutine,才能真正關閉該pipeline
stopc chan struct{}
}
作業原理

- pipeline在啟動的時候會啟動4個goroutine來發送訊息
- rafthttp.peer.send()在發送訊息的時候會選擇合適的通道,進入待發送狀態
- pipeline.handle()將pipeline.msgc通道接收到要發送的訊息后,呼叫pipeline.post()將其發送出去
- rafthttp.Transport.Handler()方法pipelineHandler的Handler實作負責接收pipeline發送的資料,接受完后再將訊息提交到etcd-raft模塊,
啟動
在上一節中peer的startPeer方法,有對pipeline的初始化和啟動的操作,
func (p *pipeline) start() {
p.stopc = make(chan struct{})
p.msgc = make(chan raftpb.Message, pipelineBufSize)//初始化msgc通道,默認緩沖是64個
p.wg.Add(connPerPipeline)
for i := 0; i < connPerPipeline; i++ {//默認開啟4個goroutine來處理msgc中待發送的訊息
go p.handle()//發送訊息
}
}
pipeline.start()會做初始化和啟動用來發送訊息的后臺goroutine,
handle方法處理msgc中待發送的訊息
pipeline.handle在上面的pipeline.start的時候使用到,
func (p *pipeline) handle() {
defer p.wg.Done()//handle()方法執行完成,也就是當前這個goroutine結束
for {
select {
case m := <-p.msgc: //接收待發送的MsgSnap型別的訊息
start := time.Now()
err := p.post(pbutil.MustMarshal(&m))//將訊息序列化,然后創建HTTP請求并發送出去
end := time.Now()
if err != nil {
//將通道的網路連接狀態置為不活躍
p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
if m.Type == raftpb.MsgApp && p.followerStats != nil {
p.followerStats.Fail()
}
//向底層的Raft狀態機報告失敗資訊
p.raft.ReportUnreachable(m.To)
if isMsgSnap(m) {
p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
}
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
continue
}
//發送成功,將通道的網路連接狀態置為活躍
p.status.activate()
if m.Type == raftpb.MsgApp && p.followerStats != nil {
p.followerStats.Succ(end.Sub(start))
}
//向底層的Raft狀態機報告發送成功的訊息
if isMsgSnap(m) {
p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
}
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
case <-p.stopc:
return
}
}
}
在pipeline.handle()方法中會從msgc通道中讀取待發送的Message訊息,然后呼叫pipeline.post()方法將其發送出去,發送結束之后會呼叫底層Raft介面的相應方法報告發送結果,
post發送訊息
pipeline.post在上面的pipeline.handle的時候使用,
func (p *pipeline) post(data []byte) (err error) {
u := p.picker.pick()//選擇可用的url
//創建HTTP POST請求
req := createPostRequest(p.tr.Logger, u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
done := make(chan struct{}, 1)//主要用于通知下面的goroutine請求是否已經發送完成
ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)
go func() { //該goroutine主要用于監聽請求是否取消
select {
case <-done:
case <-p.stopc: //如果在請求得發送程序中,pipeline被關閉,則取消該請求
waitSchedule()
cancel()//取消請求
}
}()
//發送HTTP POST請求,并獲取到對應的回應,
resp, err := p.tr.pipelineRt.RoundTrip(req)
done <- struct{}{}//通知上述goroutine,請求已經發送完畢
if err != nil {
p.picker.unreachable(u)
return err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)//等到回應的結果
if err != nil {
p.picker.unreachable(u)//出現例外時,則將該URL標識為不可用
return err
}
//檢查回應的內容
err = checkPostResponse(p.tr.Logger, resp, b, req, p.peerID)
if err != nil {
p.picker.unreachable(u)
// errMemberRemoved is a critical error since a removed member should
// always be stopped. So we use reportCriticalError to report it to errorc.
if err == errMemberRemoved {
reportCriticalError(err, p.errorc)
}
return err
}
return nil
}
pipeline.post()方法是真正完成訊息發送的地方,其中會啟動一個后臺goroutine監聽控制發送程序及獲取發送結果,
更多歡迎關注go成神之路
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/204943.html
標籤:其他
上一篇:Linux服務器安裝node
