1 Broadcast Client創建流程

Peer節點呼叫GetBroadcastClientFnc函式來獲取Broadcast服務客戶端,客戶端提供了Send(*common.Envelope) 用于發送交易訊息請求,
- 根據oderer的配置創建ordererclient客戶端
- 呼叫NewConnection創建gRPC連接物件conn
- 呼叫NewAtomicBroadcastClient請求呼叫Broadcast服務介面,創建服務客戶端(atomicBroadcastBroadcastClient)型別
type AtomicBroadcastClient interface {
// broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
Broadcast(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_BroadcastClient, error)
// deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
Deliver(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_DeliverClient, error)
}
type atomicBroadcastClient struct {
cc *grpc.ClientConn
}
func NewAtomicBroadcastClient(cc *grpc.ClientConn) AtomicBroadcastClient {
return &atomicBroadcastClient{cc}
}
func (c *atomicBroadcastClient) Broadcast(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_BroadcastClient, error) {
stream, err := c.cc.NewStream(ctx, &_AtomicBroadcast_serviceDesc.Streams[0], "/orderer.AtomicBroadcast/Broadcast", opts...)
if err != nil {
return nil, err
}
x := &atomicBroadcastBroadcastClient{stream}
return x, nil
}
type AtomicBroadcast_BroadcastClient interface {
Send(*common.Envelope) error
Recv() (*BroadcastResponse, error)
grpc.ClientStream
}
type atomicBroadcastBroadcastClient struct {
grpc.ClientStream
}
2 Broadcast服務訊息處理
Orederer節點啟動時已經在本地gRPC服務器上注冊了Orderer排序服務器,并創建了Broadcast服務處理句柄,當客戶端呼叫Broadcast()發起服務請求時,會呼叫s.bh.Handle方法處理請求,通過訊息句柄呼叫Handle方法,通過服務器端srv呼叫srv.Recv(),監聽并接收Send介面發送的交易訊息請求,
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Debugf("Starting new Broadcast handler")
defer func() {
if r := recover(); r != nil {
logger.Criticalf("Broadcast client triggered panic: %s\n%s", r, debug.Stack())
}
logger.Debugf("Closing Broadcast stream")
}()
return s.bh.Handle(&broadcastMsgTracer{
AtomicBroadcast_BroadcastServer: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Broadcast",
},
})
}
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv() //阻塞等待新的交易請求
......
resp := bh.ProcessMessage(msg, addr)
err = srv.Send(resp) //發送成功處理狀態回應訊息
if resp.Status != cb.Status_SUCCESS {
return err
}
......
}
}
ProcessMessage中的關鍵函式:
- BroadcastChannelSupport決議出訊息的通道頭部chdr,配置交易訊息標志位isConfig、鏈支持物件processor(ChainSupport型別)
- processor.WaitReady檢查當前通道共識組件鏈是否已經準備好接收新訊息
- processor.ProcessNormalMsg處理普通交易訊息
- processor.Order重新配置普通交易訊息(包含configSeq最新配置序號),交給共識鏈物件請求排序出塊
- processor.ProcessConfigUpdateMsg處理配置交易訊息
- processor.Configure構造新的配置交易訊息,交給共識鏈物件請求處理
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
tracker := &MetricsTracker{
ChannelID: "unknown",
TxType: "unknown",
Metrics: bh.Metrics,
}
defer func() {
// This looks a little unnecessary, but if done directly as
// a defer, resp gets the (always nil) current state of resp
// and not the return value
tracker.Record(resp)
}()
tracker.BeginValidate()
chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
if chdr != nil {
tracker.ChannelID = chdr.ChannelId
tracker.TxType = cb.HeaderType(chdr.Type).String()
}
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
configSeq, err := processor.ProcessNormalMsg(msg)
......
err = processor.Order(msg, configSeq)
......
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
......
err = processor.Configure(config, configSeq)
......
}
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

2.1 普通交易處理
呼叫StandardChannel的ProcessNormalMsg方法處理普通交易資訊
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
oc, ok := s.support.OrdererConfig()
if !ok {
logger.Panicf("Missing orderer config")
}
if oc.Capabilities().ConsensusTypeMigration() {
if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
return 0, errors.WithMessage(
ErrMaintenanceMode, "normal transactions are rejected")
}
}
configSeq = s.support.Sequence()
err = s.filters.Apply(env)
return
}
- 呼叫s.support.Sequence獲取通道配置序號configSeq,默認值為0,新建應用通道配置序號增1,該配置序號可以用來標識通道配置資訊的版本
- 呼叫 s.filters.Apply,利用自帶的通道資訊過濾器過濾該訊息,檢查是否滿足應用通道上的訊息處理請求
這里共有四個過濾器:
- EmptyRejectRule空檢測
- MaxBytesRule最大位元組數檢測
- SigFilter訊息簽名驗證(Channel-Writes通道寫權限)
- expirationRejectRule 拒絕過期的簽名證書
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.NormalProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
if err := c.isRunning(); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}:
lead := <-leadC
if lead == raft.None {
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("no Raft leader")
}
if lead != c.raftID {
if err := c.rpc.SendSubmit(lead, req); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
}
case <-c.doneC:
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("chain is stopped")
}
return nil
}
Submit 首先將請求訊息封裝為 submit 結構通過當前 Chain實體的通道 c.submitC 傳遞給后端處理(下一節分析如何處理),同時獲取當前時刻 raft 集群的 leader 資訊,
這里對 leader 的不同狀態進行了不同處理:
- lead == raft.None:即當前集群中還沒有選出一個 leader,那么說明共識功能暫時可不用,所以直接回傳 error-“no Raft leader”;
- lead != c.raftID:即當前節點不是 raft 集群的 leader,非 leader 不進行訊息處理,所以通過 rpc.SendSubmit 方法將訊息轉發給目標 leader;
- lead == c.raftID:這是一個的隱含情況,即當前節點為 leader 的情況,那自然是針對請求訊息進行處理,由接收 submitC 通道訊息的部分處理,
也就是說,所有的應用端發送給 orderer 的 broadcast 請求報文,都會被轉發給 raft 集群中的 leader 節點進行處理,如果沒有 leader 則回傳錯誤資訊,
2.2 配置交易訊息
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
channelID, err := protoutil.ChannelID(envConfigUpdate)
if err != nil {
return nil, 0, err
}
logger.Debugf("Processing config update tx with system channel message processor for channel ID %s", channelID)
if channelID == s.support.ChannelID() {
return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
}
// XXX we should check that the signature on the outer envelope is at least valid for some MSP in the system channel
logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChannelID())
// If the channel ID does not match the system channel, then this must be a channel creation transaction
bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
if err != nil {
return nil, 0, err
}
newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
if err != nil {
return nil, 0, errors.WithMessagef(err, "error validating channel creation transaction for new channel '%s', could not successfully apply update to template configuration", channelID)
}
newChannelEnvConfig, err := protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
if err != nil {
return nil, 0, err
}
wrappedOrdererTransaction, err := protoutil.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChannelID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
if err != nil {
return nil, 0, err
}
// We re-apply the filters here, especially for the size filter, to ensure that the transaction we
// just constructed is not too large for our consenter. It additionally reapplies the signature
// check, which although not strictly necessary, is a good sanity check, in case the orderer
// has not been configured with the right cert material. The additional overhead of the signature
// check is negligible, as this is the channel creation path and not the normal path.
err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
if err != nil {
return nil, 0, err
}
return wrappedOrdererTransaction, s.support.Sequence(), nil
}
- protoutil.ChannelID獲取訊息中的通道ID
- 如果訊息ID與當前訊息通道ID一致,交給標準通道處理器處理StandardChannel.ProcessConfigUpdateMsg
- NewChannelConfig創建新的應用通道
- ProposeConfigUpdate構造新的通道交易配置資訊(ConfigEnvelope型別)
- CreateSignedEnvelope分別創建HeaderType_CONFIG和HeaderType_ORDERER_TRANSACTION的配置交易資訊
- StandardChannel.filters.Apply系統通道的訊息過濾器
- 呼叫s.support.Sequence獲取通道配置序號configSeq
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
logger.Debugf("Processing config update message for existing channel %s", s.support.ChannelID())
// Call Sequence first. If seq advances between proposal and acceptance, this is okay, and will cause reprocessing
// however, if Sequence is called last, then a success could be falsely attributed to a newer configSeq
seq := s.support.Sequence()
err = s.filters.Apply(env)
......
configEnvelope, err := s.support.ProposeConfigUpdate(env)
......
config, err = protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChannelID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
......
err = s.filters.Apply(config)
......
err = s.maintenanceFilter.Apply(config)
return config, seq, nil
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/214740.html
標籤:其他
