主頁 > 移動端開發 > 自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的通信傳輸

自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的通信傳輸

2020-09-14 12:15:38 移動端開發

前言

計算框架是自動駕駛系統中的重中之重,也是整個系統得以高效穩定運行的基礎,為了實時地完成感知、決策和執行,系統需要一系列的模塊相互緊密配合,高效地執行任務流,由于各種原因,這些模塊可能位于不同行程,也可能位于不同機器,這就要求計算框架中具有靈活的、高性能的通信機制,我們知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS,之前寫過兩篇相關的文章介紹了其中的調度部分:《自動駕駛平臺Apollo 3.5閱讀手記:Cyber RT中的協程(Coroutine)》和《自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的任務調度》,今天就來聊一下其中的另一重要部分-通信系統,

和ROS & ROS2中類似,Cyber RT中支持兩種資料交換模式:一種是Publish-Subscribe模式,常用于資料流處理中節點間通信,即發布者(Publisher)在channel(ROS中對應地稱為topic)上發布訊息,訂閱該channel的訂閱者(Subscriber)便會收到訊息資料;另一種就是常見的Service-Client模式,常用于客戶端與服務端的請求與回應,本質上它是可以基于前者實作的,Node是整個資料拓撲網路中的基本單元,一個Node中可以創建多個讀者/寫者,服務端/客戶端,讀者和寫者分別對應ReaderWriter,用于Publish-Subscribe模式,服務端和客戶端分別對應ServiceClient,用于Service-Client模式,

實作決議

自動駕駛系統中的各個處理模塊基本都是實作為Component,一個Component中包含一個Node,另外會根據需要創建和管理WriterReaderServiceClient,這些用于通信的類下面基于TrasmitterReceiver類,前者用于資料發送,后者用于資料接收,它們是資料傳輸層的抽象,之下可有多個傳輸層實作用于不同場景下的傳輸,如對于TrasmitterIntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter,對于Receiver也是類似的,其中RTPS后端基于Fast RTPS,Fast RTPS是DDS(Data Distribution Service)標準的一個非常流行的開源實作,DDS標準提供了一個平臺無關的資料模型,主要用于實時分布式系統,不同的實作可以相互通信,整個通信系統的架構層次圖如下,
在這里插入圖片描述

下面我們就從幾個方面深入地看下它們的實作機制,

服務發現與拓撲管理

首先來看下比較基礎與核心的服務發現與拓撲管理,其實作主要在目錄cyber/service_discovery/下,節點間通過讀和寫端建立資料通路,以channel為邊,這樣可以得到一個資料流圖絡,由于節點可能退出,訂閱情況也可能發生改變,所以這個網路是動態的,因此需要對網路拓撲進行監控,

主要負責這件事的資料結構是TopologyManager,它是個單例,因為每個行程只要有一個來負責監控網路拓撲就可以了,TopologyManager有三個子管理器,并有共同的基類Manager,它們分別為:
? - NodeManager用于管理網路拓撲中的節點,
? - ChannelManager用于管理channel,即網路拓撲中的邊,
? - ServiceManager用于管理ServiceClient

Cyber RT中有兩個層面的拓撲變化的監控:

  • 基于Fast RTPS的發現機制

它主要監視網路中是否有參與者加入或退出,TopologyManager::CreateParticipant()函式創建transport::Participant物件時會輸入包含host name與process id的名稱,ParticipantListener用于監聽網路的變化,網路拓撲發生變化時,Fast RTPS傳上來ParticipantDiscoveryInfo,在TopologyManager::Convert()函式中對該資訊轉換成Cyber RT中的資料結構ChangeMsg,然后呼叫回呼函式TopologyManager::OnParticipantChange(),它會呼叫其它幾個子管理器的OnTopoModuleLeave()函式,然后子管理器中便可以將相應維護的資訊進行更新(如NodeManager中將相應的節點洗掉),

這層拓撲監控主要是通過Fast RTPS提供的自動發現機制,如行程意外退出,則要將各管理中相應資訊進行更新,它的優點是如果行程出錯或設備斷開也可以作業,但粒度比較粗,且不是非常及時(比如斷開時),

  • 基于主動式的拓撲變更廣播

這一部分主要在TopologyManager::Init()函式中創建和初始化,在初始化時,會呼叫它們的StartDiscovery()函式開始啟動自動發現機制,基于TopologyManager中的RtpsParticipant物件,這幾個子管理會通過CreateSubscriber()CreatePublisher()函式創建相應的subscriber和publisher,子管理器中channel名稱分別為node_change_broadcastchannel_change_broadcastservice_change_broadcast,Subscriber的回呼函式為Manager::OnRemoteChange(),該回呼函式中會決議拓撲變更訊息并呼叫Dispose()函式進行處理,

這層拓撲監控是主動式的,即需要相應的地方主動呼叫Join()Leave()來觸發,然后各子管理器中回呼函式進行資訊的更新,如NodeChannelImpl創建時會呼叫NodeManager::Join()ReaderWriter初始化時會呼叫JoinTheTopolicy()函式,繼而呼叫ChannelManager::Join()函式,相應地,有LeaveTheTopology()函式表示退出拓撲網路,在這兩個函式中,會呼叫Dispose()函式,而這個函式是虛函式,在各子管理器中有各自的實作,另外Manager提供AddChangeListener()函式注冊當拓撲發生變化時的回呼函式,舉例來說,Reader::JoinTheTopology()函式中會通過該函式注冊回呼Reader::OnChannelChange()

資料傳輸

在一個分布式計算系統中,根據兩個節點間的位置關系需要使用不同的傳輸方式(定義在CommunicationMode中):
? - INTRA:如果是同行程的,因為在同一地址空間,直接傳指標就完了,
? - SHM(Shared memory):如果是同一機器上,但跨行程的,為了高效可以使用共享記憶體,
? - RTPS:如果是跨設備的,那就老老實實通過網路傳吧,

示意圖如下:
在這里插入圖片描述

很多時候一個計算圖中各種情況都有,所以為了達到最好的性能,需要混合使用,這種混合模式稱為HYBRID模式,框架需要根據節點間關系選擇合適的傳輸后端,

每個WriterTransmitter,每個ReaderReceiver,它們是負責訊息發送與收取的類,TransmitterReceiver的基類為Endpoint,代表一個通信的端點,它主要的資訊是身份標識與屬性,其型別為RoleAttributes(定義在role_attributes.proto)的成員attr_包含了host name,process id和一個根據uuid產生的hash值作為id,通過它們就可以判斷節點之間的相對位置關系了,

ReaderWriter會呼叫Transport的方法CreateTransmitter()CreateReceiver()用于創建發送端的transmitter和接收端的receiver,創建時有四種模式可選,分別是INTRA,SHM和RTPS,和HYBRID,最后一種是前三種的混合模式,也是默認的模式,如Transmitter對應的繼承類為IntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter,這幾個繼承類最主要實作了Transmit()函式用于傳輸資料,對于Receiver來說是類似的,它有4個繼承類對應四種傳輸方式,即IntraReceiverShmReceiverRtpsReceiverHybridReceiver

結合前面提到的幾種模式對應的場景,transmitter與receiver的對應關系如下:
在這里插入圖片描述

前面提到,傳輸層實作主要有四個實作后端,對應四種模式:

  • RTPS:RTPS部分基于eProsimar的Fast RTPS,RtpsTransmitter類中創建和封裝publisher,Transmit()函式將訊息序列化成Fast RTP中的格式UnderlayMessage,然后通過publisher發出去,RtpsReceiver中的dispatcher_成員指向單例RtpsDispatcher,它用于派發RTPS發來的資料,維護了channel id到subscriber的查找表,RtpsDispatcher::AddSubscriber()函式使用eprosima::fastrtps::Domain::createSubscriber()函式創建subscriber,其回呼統一為RtpsDispatcher::OnMessage()函式,該函式會將從RTPS通路來的訊息進行派發,

  • SHMSegment類表示一塊對應一個channel的共享記憶體,由SegmentFactory::CreateSegment函式創建,它有兩個繼承類PosixSegmentXsiSegment,是平臺相關的實作,在寫端,ShmTransmitter::Transmit()函式用于發送訊息,該函式先通過AcquireBlockToWrite()函式拿一個可寫的block,如果發現該Segment尚未初始化,會呼叫OpenOrCreate()通過OS的介面創建共享記憶體并且map出虛擬地址,這塊共享記憶體區域大體分兩部分,一部分為元資訊,另一部分為訊息資料,后者會被切分為相同大小的block,block的buffer大小默認16K,但遇上訊息超出大小的時候會調整,拿到該block后,將訊息序列化后寫入,并通知讀者來取訊息,通知機制是通過NotifierBase實作的,它有兩個實作類,分別為ConditionNotifierMulticastNotifier,前者為默認設定,它會單獨開一塊共享共享專門用于通知,其中包含了ReadableInfo等資訊,MulticastNotifier的主要區別是它是通過指定的socket廣播,在讀端,ShmDispatcher::Init()初始化時會創建專門的執行緒,執行緒的執行體為ShmDispatcher::Threadfunc()函式,它在回圈體內會通過Listen()函式等待新訊息,如果有新訊息寫入后發出通知,這兒就會往下走,基于通知中的ReadableInfo資訊,得到channel id,block index等資訊,然后呼叫ReadMessage()函式讀訊息并反序列化,之后呼叫ShmDispatcher::OnMessage()函式進行訊息派發,

  • INTRA :用于行程內通信,由于讀者和寫者是在同一行程內,因此可以直接呼叫,在IntraTransmitter::Transmit()函式中,會直接呼叫讀端的IntraDispatcher::OnMessage(),該函式進行下一步訊息的派發,

  • HYBRID:即默認模式,是前三種的結合體,具體功能其實還是交給前面幾個后端完成的,只是它會根據讀者與寫者的關系使用相應的后端,

訊息寫端

寫端的實作相對簡單一些,在模塊組件中,可以通過CreateWriter()函式創建Writer物件,然后就可以通過該物件向指定channel發送訊息了,以CameraComponent為例:

writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
...
auto pb_image = std::make_shared<Image>();                                 
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());      
pb_image->set_width(raw_image_->width);                                    
pb_image->set_height(raw_image_->height);                                  
pb_image->mutable_data()->reserve(raw_image_->image_size);       
...
writer_->Write(pb_image);

這里先創建了Writer物件,然后填好了訊息里的資料(這里發送的訊息型別為Image,定義在modules/drivers/proto/sensor_image.proto檔案),最后呼叫Writer::Write()函式將該訊息發出,

CreateWriter()函式中先創建Writer物件,再呼叫Writer::Init()函式進行初始化,初始化中主要通過CreateTransmitter()函式創建Transmitter物件,因為默認是HYBRID模式,所以這里實際創建的是HybridTransmitter物件,Transmitter繼承自Endpoint類,它其中的屬性資訊以用來判斷讀者與寫者的相對關系,不同的相對關系決定使用何種Transmitter物件,其配置在InitMode()函式中設定:

template <typename M>                                   
void HybridTransmitter<M>::InitMode() {                 
  mode_ = std::make_shared<proto::CommunicationMode>(); 
  mapping_table_[SAME_PROC] = mode_->same_proc();       
  mapping_table_[DIFF_PROC] = mode_->diff_proc();       
  mapping_table_[DIFF_HOST] = mode_->diff_host();       
}                                                       

Writer物件的初始化中還會將呼叫JointTheTopology()函式將之加入到ChannelManager維護的拓撲資訊中,

template <typename MessageT>                                              
void Writer<MessageT>::JoinTheTopology() {                                
  // add listener                                                         
  change_conn_ = channel_manager_->AddChangeListener(std::bind(           
      &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));  
                                                                          
  // get peer readers                                                     
  const std::string& channel_name = this->role_attr_.channel_name();      
  std::vector<proto::RoleAttributes> readers;                             
  channel_manager_->GetReadersOfChannel(channel_name, &readers);          
  for (auto& reader : readers) {                                          
    transmitter_->Enable(reader);                                         
  }                                                                       
                                                                          
  channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,  
                         message::HasSerializer<MessageT>::value);        
}                                                                         

這里還會做一件比較重要的事是enable相應的Transmitter,先通過ChannelManager得到該channel相應讀者的資訊,然后對于每個讀者,呼叫HybridTransmitter::Enable()函式,HybridTransmitter是混合模式的Transmitter,它其實包含了RTPS,SHM和INTRA三種Transmitter實體,但這三種Transmitter并不一定都需要用到,比如,如果該訊息對應的讀者全是同行程的,那就沒必要整上SHM和RTPS了,HybridTransmitter::Enable()函式會根據引數來enable合適的Transmitter

template <typename M>
void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {    
  auto relation = GetRelation(opposite_attr);
  if (relation == NO_RELATION) {
    return;
  }
  
  uint64_t id = opposite_attr.id();
  std::lock_guard<std::mutex> lock(mutex_);                                 
  receivers_[mapping_table_[relation]].insert(id);                          
  transmitters_[mapping_table_[relation]]->Enable();
  TransmitHistoryMsg(opposite_attr);                                        
}

相應地,在Disable()函式中決定是否要disable相應的Transmitter,這樣在之后的Transmit()函式中只要把transmitters_中的所有Transmitter拿出來呼叫Transmit()函式即可,

發送資料是通過Writer::Write()函式繼而呼叫Transmitter::Transmit()函式來實作的,因為這里是用的HybridTransmitter,因此實際呼叫的是HybridTransmitter::Transmit()函式:

template <typename M>
bool HybridTransmitter<M>::Transmit(const MessagePtr& msg,
                                    const MessageInfo& msg_info) {
  std::lock_guard<std::mutex> lock(mutex_);
  history_->Add(msg, msg_info);
  for (auto& item : transmitters_) {
    item.second->Transmit(msg, msg_info);
  }
  return true;
} 

可以看到這里分別呼叫三大TransmitterTransmit()函式發送訊息,

訊息讀端

讀端的處理鏈路相比下復雜一些,先回顧一個Component中對訊息的處理,對于一個Component來說,它可能會從多個channel收取訊息,然后基于所有channel的訊息才能處理,第一個channel暫且稱之為主channel,這些channel訊息的組合我們暫且稱為組合訊息,我們就來看下典型的兩個channel情況,其初始化的主要代碼為:

template <typename M0, typename M1>                      
bool Component<M0, M1, NullType, NullType>::Initialize(  
  ...
  ReaderConfig reader_cfg;                                                       
  reader_cfg.channel_name = config.readers(1).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();        
                                                                                 
  auto reader1 = node_->template CreateReader<M1>(reader_cfg);                   
                                                                                 
  reader_cfg.channel_name = config.readers(0).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();        
                                                                                 
  reader0 = node_->template CreateReader<M0>(reader_cfg);         
  ...
  readers_.push_back(std::move(reader0)); 
  readers_.push_back(std::move(reader1)); 
  ...  
  std::vector<data::VisitorConfig> config_list;                                   
  for (auto& reader : readers_) {                                                 
    config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());    
  }                                                                               
  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);             
  croutine::RoutineFactory factory =                                              
      croutine::CreateRoutineFactory<M0, M1>(func, dv);                           
  return sched->CreateTask(factory, node_->Name());                               
}

其中對兩個channel分別創建Reader物件,該Reader物件是針對單個channel的,然后針對所有channel創建DataVisitor物件,這時就是針對所有channel的組合訊息了,最后創建協程來進行組合資料的處理,后面會看到每個Reader都會有單獨的協程來做資料讀取,因此,對于一個有n個channel的component,框架會為此創建至少n+1個協程,

其中比較重要的結構就是用于讀取訊息的Reader類了,我們先看Reader物件的創建,其初始化函式Init()如下:

template <typename MessageT>                                                         
bool Reader<MessageT>::Init() {                                                      
  if (init_.exchange(true)) {                                                        
    return true;                                                                     
  }                                                                                  
  std::function<void(const std::shared_ptr<MessageT>&)> func;                        
  if (reader_func_ != nullptr) {                                                     
    func = [this](const std::shared_ptr<MessageT>& msg) {                            
      this->Enqueue(msg);                                                            
      this->reader_func_(msg);                                                       
    };                                                                               
  } else {                                                                           
    func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };     
  }                                                                                  
  auto sched = scheduler::Instance();                                                
  croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();         
  auto dv = std::make_shared<data::DataVisitor<MessageT>>(                           
      role_attr_.channel_id(), pending_queue_size_);                                 
  // Using factory to wrap templates.                                                
  croutine::RoutineFactory factory =                                                 
      croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);                 
  if (!sched->CreateTask(factory, croutine_name_)) {                                 
    AERROR << "Create Task Failed!";                                                 
    init_.store(false);                                                              
    return false;                                                                    
  }                                                                                  
                                                                                     
  receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);        
  this->role_attr_.set_id(receiver_->id().HashValue());                              
  channel_manager_ =                                                                 
      service_discovery::TopologyManager::Instance()->channel_manager();             
  JoinTheTopology();                                                                 
                                                                                     
  return true;                                                                       
}                                                                                    

這里主要創建了相應的DataVisitor類,協程和Receiver類等,其中DataVisitor主要用于訊息資料的訪問,它存放到來的訊息資料,并提供介面供訊息讀取,還是以兩個channel的情況為例:

template <typename M0, typename M1>                                                  
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {             
 public:                                                                             
  explicit DataVisitor(const std::vector<VisitorConfig>& configs)                    
      : buffer_m0_(configs[0].channel_id,                                            
                   new BufferType<M0>(configs[0].queue_size)),                       
        buffer_m1_(configs[1].channel_id,                                            
                   new BufferType<M1>(configs[1].queue_size)) {                      
    DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);                           
    DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);                           
    data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);                 
    data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);            
  }   
  ... 
  bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {  // NOLINT            
    if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {                                 
      next_msg_index_++;                                                                  
      return true;                                                                        
    }                                                                                     
    return false;                                                                         
  }                                                                                       
                                                                                          
 private:                                                                                 
  fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;                                     
  ChannelBuffer<M0> buffer_m0_;                                                           
  ChannelBuffer<M1> buffer_m1_;                                                           
};                                                                                        

它的成員變數中對每一個channel都有一個對應的ChannelBuffer物件,DataDispatcher::AddBuffer()函式在DataVisitor初始化時用來將這些個ChannelBuffer加入到DataDispatcher的管理中,同時,DataNotifier::AddNotifier()函式用來以主channel的id為鍵值加入到DataNotifier的管理中,DataDispatcherDataNotifier均為單例,前者為模板類,意味著每一個訊息型別會有對應的DataDispatcher物件,且相同訊息型別會共享該物件,顧名思義,它主要用于資料傳輸層有資料來時的分發,即當新訊息到來時通過DataDispatcher::Dispatch()函式把它放到相應的訊息緩沖區中,后者用于管理所有的Notifier,它用于在訊息派發完后喚醒相應的協程進行處理,這些物件的大體結構圖如下:
在這里插入圖片描述

當channel多于一個時(組合訊息),DataVisitor中還有一個DataFusion物件用于將多路channel的資料合并,DataFusion的實作類為AllLatest,聽名字就知道它會取所有channel中的最新值,除了per-channel的ChannelBuffer物件外,它還有一個特殊的ChannelBuffer物件用于存放多channel訊息的組合訊息(即各個channel的訊息型別的tuple),當填入主channel的訊息時,會呼叫由SetFusionCallback()函式注冊的回呼,該回呼判斷是否所有channel都有訊息,如果都有訊息的話就將這些訊息作為組合訊息填入該組合訊息的ChannelBuffer中, 在協程處理函式中會呼叫DataVisitor::TryFetch()函式從該ChannelBuffer中拿組合訊息,值得注意的是這件事只在主channel有訊息來時才會被觸發,因此主channel的選取是有講究的,

Reader初始化時創建的另一個關鍵物件為Receiver,它有4個繼承類,默認為混合模式的HybridReceiverHybridReceiver::InitReceivers中分別創建相應的IntraReceiverShmReceiverRtpsReceiver,放在成員receivers_陣列中,它會來根據寫端的情況來enable和disable相應的ReceiverReceiverManager用于管理這些Receiver物件,它以channel為key進行管理,因此同一行程內訂閱同一個channel的會共用同一個Receiver物件,ReceiverManager::GetReceiver()函式用于按鍵值取出Receiver,如沒有,則通過Transport::CreateReceiver()函式新建一個Receiver, 這些個ReceiverEnable()函式中會通過AddListener()函式向對應的Dispatcher注冊其回呼函式XXXReceiver::OnNewMessage()Dispatcher類中的成員msg_listeners_是channel id到ListenerHandler物件的查找表,ListenerHandler通過signal/slot機制保存了所有這些回呼,注意不同傳輸后端的AddListener()實作略有不同,比如RtpsDispatcher::AddListener()函式中會將輸入的訊息先通過ParseFromString()函式進行決議,然后呼叫傳入的回呼,ShmDispatcher::AddListener()函式也是類似,它會先通過ParseFromArray()函式決議訊息,而對于IntraDispatcher::AddListener(),由于是同個行程內,是以訊息本身的型別傳的,就沒必要決議了,

這些相關結構關系示意圖如下:
在這里插入圖片描述
看了一些關鍵相關資料結構,接下來看下讀端的處理流程,首先,如之前介紹的,各Dispatcher的繼承類各顯神通使自己的OnMessage()回呼函式被呼叫,以RtpsDispatcher為例:

void RtpsDispatcher::OnMessage(uint64_t channel_id,
                               const std::shared_ptr<std::string>& msg_str,
                               const MessageInfo& msg_info) {
  if (is_shutdown_.load()) {
    return;
  }
    
  ListenerHandlerBasePtr* handler_base = nullptr;
  if (msg_listeners_.Get(channel_id, &handler_base)) {
    auto handler =
        std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
    handler->Run(msg_str, msg_info);
  }
}

這里ListenerHandler::Run()會根據訊息的發送者資訊找到對應的回呼,即Receiver::OnNewMessage()

template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
                               const MessageInfo& msg_info) {
  if (msg_listener_ != nullptr) {
    msg_listener_(msg, msg_info, attr_);
  }
}

這里的回呼函式msg_listener_是在Receiver創建的時候傳入的,其實主要是呼叫了DataDispatcher::Dispatch()函式來訊息的派發:

transport::Transport::Instance()->CreateReceiver<MessageT>(      
    role_attr, [](const std::shared_ptr<MessageT>& msg,          
                  const transport::MessageInfo& msg_info,        
                  const proto::RoleAttributes& reader_attr) {    
      (void)msg_info;                                            
      (void)reader_attr;                                         
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::DISPATCH, reader_attr.channel_id(),         
          msg_info.seq_num());                                   
      data::DataDispatcher<MessageT>::Instance()->Dispatch(      
          reader_attr.channel_id(), msg);                        
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::NOTIFY, reader_attr.channel_id(),           
          msg_info.seq_num());                                   
    });                                                                                                                       

DataDisaptcher是模板類單例,即對于一種特定型別的訊息可以共用一個DataDispatcher,之前在DataVisitor初始化時會通過AddBuffer()函式將ChannelBuffer加入到DataDispatcher的成員buffers_map_中,它是一個以channel id為key的map,其value為所有等待該channel上訊息的CacheBuffer的陣列,也就是說,訊息分發時,只需要根據channel id找到這些buffer,然后將新來的訊息填入其中即可,這就是Dispatcher::Dispatch()函式主要做的事:

template <typename T>                                                 
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,           
                                 const std::shared_ptr<T>& msg) {     
  BufferVector* buffers = nullptr;                                    
  if (apollo::cyber::IsShutdown()) {                                  
    return false;                                                     
  }                                                                   
  if (buffers_map_.Get(channel_id, &buffers)) {                       
    for (auto& buffer_wptr : *buffers) {                              
      if (auto buffer = buffer_wptr.lock()) {                         
        std::lock_guard<std::mutex> lock(buffer->Mutex());            
        buffer->Fill(msg);                                            
      }                                                               
    }                                                                 
  } else {                                                            
    return false;                                                     
  }                                                                   
  return notifier_->Notify(channel_id);                               
}                                                                     

最后呼叫DataNotifier::Notify()函式來通知新訊息的到來,它會觸發該channel上所有對應Notifier中的回呼,

inline bool DataNotifier::Notify(const uint64_t channel_id) {  
  NotifyVector* notifies = nullptr;                            
  if (notifies_map_.Get(channel_id, &notifies)) {              
    for (auto& notifier : *notifies) {                         
      if (notifier && notifier->callback) {                    
        notifier->callback();                                  
      }                                                        
    }                                                          
    return true;                                               
  }                                                            
  return false;                                                
}                                                              

這個Notifier中的回呼是在創建協程時通過RegisterNotifyCallback()函式注冊進去的,目的是為了喚醒相應的協程來處理該新訊息,

visitor->RegisterNotifyCallback([this, task_id]() { 
  if (cyber_unlikely(stop_.load())) {               
    return;                                         
  }                                                 
  this->NotifyProcessor(task_id);                   
});                                                 

NotifyProcessor()函式會修改對應協程的狀態使之能被調度執行,前面提到,對于n個channel輸入的component,會有n+1個協程,它們都是以DataVisitor和訊息回呼函式一起作為引數創建的,這個協程主體中會呼叫DataVisitor::TryFetch()函式拿訊息,然后呼叫注冊的訊息處理函式:

  factory.create_routine = [=]() {                                           
    return [=]() {                                                           
      std::shared_ptr<M0> msg;                                               
      for (;;) {                                                             
        CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);   
        if (dv->TryFetch(msg)) {                                             
          f(msg);                                                            
          CRoutine::Yield(RoutineState::READY);                              
        } else {                                                             
          CRoutine::Yield();                                                 
        }                                                                    
      }                                                                      
    };                                                                       
  };                                                                         

對于那n個訊息讀取協程來說,其訊息處理函式為:

func = [this](const std::shared_ptr<MessageT>& msg) {    
  this->Enqueue(msg);                                    
  this->reader_func_(msg);                               
};                                                       

這個回呼函式中會呼叫Reader::Enqueue()函式,在該函式中,主要呼叫Blocker::Publish()函式,它繼而呼叫Blocker::Enqueue()Blocker::Notify()函式,Blocker類是一個存盤訊息的結構,BlockerManager類用于管理Blocker,其中維護了以channel為鍵值的Blocker的map,Reader::Enqueue()函式將訊息放到Blocker的成員published_msg_queue_佇列中,之后,可以通過Blocker::Observe()函式將成員published_msg_queue_佇列的訊息放到成員observed_msg_queue_佇列,然后通過Blocker::GetLatestObserved()函式得到最新的訊息,比如ControlComponent中的:

chassis_reader_->Observe();
const auto &chassis_msg = chassis_reader_->GetLatestObserved();

而對于剩下那一個協程,它是由主channel來觸發的,因它處理的是多channel的組合訊息,在協程主體中的TryFetch()函式會呼叫AllLatest::Fusion()函式同時拿多個channel上的最新訊息,至于這個組合訊息是怎么填入的前面有提,簡單來說,對于它來說,主channel來訊息時,同時也會將其它channel的訊息寫入組合訊息,然后調度協程,拿出組合訊息進行處理,其訊息處理函式為:

 auto func = [self](const std::shared_ptr<M0>& msg0,         
                    const std::shared_ptr<M1>& msg1) {       
   auto ptr = self.lock();                                   
   if (ptr) {                                                
     ptr->Process(msg0, msg1);                               
   } else {                                                  
     AERROR << "Component object has been destroyed.";       
   }                                                         
 };                                                          

該實作中主要以收到的訊息為引數呼叫Component中的處理函式Process(),從而執行組件的自定義處理邏輯,

小結

文中提了不少細枝末節,最后非常high-level地概括下從寫者到讀者的流程,寫者Writer寫訊息時,會通過HybridTransmitter繼而使用合適后端的Transmitter發送訊息,根據讀與寫者間的位置關系,經過網路、共享記憶體或直接呼叫的方式,對應后端的Dispatcher收到訊息,收到后轉成指定訊息型別,交給Receiver,然后通過DataDispatcher派發訊息,派發訊息就是將訊息放到對應的buffer中,然后通知相應的協程來作進一步處理,上層模塊要取用這些訊息,主要兩種方式:一種是通過ComponentProc()介面,它被呼叫時引數就是最新的訊息,另一種是通過ReaderObserve()函式直接拿,
在這里插入圖片描述

我們知道,Apollo在版本3.5前是基于ROS的,同時也對ROS做了幾個重要改進,這些改進不少是關于通信系統的,如共享記憶體、去中心化和資料兼容性,到Cyber RT的演進也自然延續了這幾個優點,總得來說,Cyber RT基于自動發現機制與Publish-Subscribe模式實作了通信網路的拓撲管理,同時它對資料傳輸層做了抽象,下面實作多個后端分別適用于不同場景,并提供了HYBRID模式可以根據讀者和寫者間的關系自動使用合適的傳輸層后端,這樣,通信系統的復雜性就被很好地屏蔽,框架就能提供給應用層便利的開發介面,

轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/34817.html

標籤:其他

上一篇:高煥堂《嵌入式UML設計》讀書筆記_第一章

下一篇:到了2020年,技術水平到底需要達到怎樣的程度才能成為頂級的阿里P8架構師

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【從零開始擼一個App】Dagger2

    Dagger2是一個IOC框架,一般用于Android平臺,第一次接觸的朋友,一定會被搞得暈頭轉向。它延續了Java平臺Spring框架代碼碎片化,注解滿天飛的傳統。嘗試將各處代碼片段串聯起來,理清思緒,真不是件容易的事。更不用說還有各版本細微的差別。 與Spring不同的是,Spring是通過反射 ......

    uj5u.com 2020-09-10 06:57:59 more
  • Flutter Weekly Issue 66

    新聞 Flutter 季度調研結果分享 教程 Flutter+FaaS一體化任務編排的思考與設計 詳解Dart中如何通過注解生成代碼 GitHub 用對了嗎?Flutter 團隊分享如何管理大型開源專案 插件 flutter-bubble-tab-indicator A Flutter librar ......

    uj5u.com 2020-09-10 06:58:52 more
  • Proguard 常用規則

    介紹 Proguard 入口,如何查看輸出,如何使用 keep 設定入口以及使用實體,如何配置壓縮,混淆,校驗等規則。

    ......

    uj5u.com 2020-09-10 06:59:00 more
  • Android 開發技術周報 Issue#292

    新聞 Android即將獲得類AirDrop功能:可向附近設備快速分享檔案 谷歌為安卓檔案管理應用引入可安全隱藏資料的Safe Folder功能 Android TV新主界面將顯示電影、電視節目和應用推薦內容 泄露的Android檔案暗示了傳說中的谷歌Pixel 5a與折疊屏新機 谷歌發布Andro ......

    uj5u.com 2020-09-10 07:00:37 more
  • AutoFitTextureView Error inflating class

    報錯: Binary XML file line #0: Binary XML file line #0: Error inflating class xxx.AutoFitTextureView 解決: <com.example.testy2.AutoFitTextureView android: ......

    uj5u.com 2020-09-10 07:00:41 more
  • 根據Uri,Cursor沒有獲取到對應的屬性

    Android: 背景:呼叫攝像頭,拍攝視頻,指定保存的地址,但是回傳的Cursor檔案,只有名稱和大小的屬性,沒有其他諸如時長,連ID屬性都沒有 使用 cursor.getInt(cursor.getColumnIndexOrThrow(MediaStore.Video.Media.DURATIO ......

    uj5u.com 2020-09-10 07:00:44 more
  • Android連載29-持久化技術

    一、持久化技術 我們平時所使用的APP產生的資料,在記憶體中都是瞬時的,會隨著斷電、關機等丟失資料,因此android系統采用了持久化技術,用于存盤這些“瞬時”資料 持久化技術包括:檔案存盤、SharedPreference存盤以及資料庫存盤,還有更復雜的SD卡記憶體儲。 二、檔案存盤 最基本存盤方式, ......

    uj5u.com 2020-09-10 07:00:47 more
  • Android Camera2Video整合到自己專案里

    背景: Android專案里呼叫攝像頭拍攝視頻,原本使用的 MediaStore.ACTION_VIDEO_CAPTURE, 后來因專案需要,改成了camera2 1.Camera2Video 官方demo有點問題,下載后,不能直接整合到專案 問題1.多次拍攝視頻崩潰 問題2.雙擊record按鈕, ......

    uj5u.com 2020-09-10 07:00:50 more
  • Android 開發技術周報 Issue#293

    新聞 谷歌為Android TV開發者提供多種新功能 Android 11將自動填表功能整合到鍵盤輸入建議中 谷歌宣布Android Auto即將支持更多的導航和數字停車應用 谷歌Pixel 5只有XL版本 搭載驍龍765G且將比Pixel 4更便宜 [圖]Wear OS將迎來重磅更新:應用啟動時間 ......

    uj5u.com 2020-09-10 07:01:38 more
  • 海豚星空掃碼投屏 Android 接收端 SDK 集成 六步驟

    掃碼投屏,開放網路,獨占設備,不需要額外下載軟體,微信掃碼,發現設備。支持標準DLNA協議,支持倍速播放。視頻,音頻,圖片投屏。好點意思。還支持自定義基于 DLNA 擴展的操作動作。好像要收費,沒體驗。 這里簡單記錄一下集成程序。 一 跟目錄的build.gradle添加私有mevan倉庫 mave ......

    uj5u.com 2020-09-10 07:01:43 more
最新发布
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:40:31 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:40:11 more
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:39:36 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:39:13 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:16:23 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:16:15 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:15:46 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:14:53 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:14:08 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:08:34 more