主頁 >  其他 > 自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的通信傳輸

自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的通信傳輸

2020-09-14 12:01:52 其他

前言

計算框架是自動駕駛系統中的重中之重,也是整個系統得以高效穩定運行的基礎,為了實時地完成感知、決策和執行,系統需要一系列的模塊相互緊密配合,高效地執行任務流,由于各種原因,這些模塊可能位于不同行程,也可能位于不同機器,這就要求計算框架中具有靈活的、高性能的通信機制,我們知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS,之前寫過兩篇相關的文章介紹了其中的調度部分:《自動駕駛平臺Apollo 3.5閱讀手記:Cyber RT中的協程(Coroutine)》和《自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的任務調度》,今天就來聊一下其中的另一重要部分-通信系統,

和ROS & ROS2中類似,Cyber RT中支持兩種資料交換模式:一種是Publish-Subscribe模式,常用于資料流處理中節點間通信,即發布者(Publisher)在channel(ROS中對應地稱為topic)上發布訊息,訂閱該channel的訂閱者(Subscriber)便會收到訊息資料;另一種就是常見的Service-Client模式,常用于客戶端與服務端的請求與回應,本質上它是可以基于前者實作的,Node是整個資料拓撲網路中的基本單元,一個Node中可以創建多個讀者/寫者,服務端/客戶端,讀者和寫者分別對應ReaderWriter,用于Publish-Subscribe模式,服務端和客戶端分別對應ServiceClient,用于Service-Client模式,

實作決議

自動駕駛系統中的各個處理模塊基本都是實作為Component,一個Component中包含一個Node,另外會根據需要創建和管理WriterReaderServiceClient,這些用于通信的類下面基于TrasmitterReceiver類,前者用于資料發送,后者用于資料接收,它們是資料傳輸層的抽象,之下可有多個傳輸層實作用于不同場景下的傳輸,如對于TrasmitterIntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter,對于Receiver也是類似的,其中RTPS后端基于Fast RTPS,Fast RTPS是DDS(Data Distribution Service)標準的一個非常流行的開源實作,DDS標準提供了一個平臺無關的資料模型,主要用于實時分布式系統,不同的實作可以相互通信,整個通信系統的架構層次圖如下,
在這里插入圖片描述

下面我們就從幾個方面深入地看下它們的實作機制,

服務發現與拓撲管理

首先來看下比較基礎與核心的服務發現與拓撲管理,其實作主要在目錄cyber/service_discovery/下,節點間通過讀和寫端建立資料通路,以channel為邊,這樣可以得到一個資料流圖絡,由于節點可能退出,訂閱情況也可能發生改變,所以這個網路是動態的,因此需要對網路拓撲進行監控,

主要負責這件事的資料結構是TopologyManager,它是個單例,因為每個行程只要有一個來負責監控網路拓撲就可以了,TopologyManager有三個子管理器,并有共同的基類Manager,它們分別為:
? - NodeManager用于管理網路拓撲中的節點,
? - ChannelManager用于管理channel,即網路拓撲中的邊,
? - ServiceManager用于管理ServiceClient

Cyber RT中有兩個層面的拓撲變化的監控:

  • 基于Fast RTPS的發現機制

它主要監視網路中是否有參與者加入或退出,TopologyManager::CreateParticipant()函式創建transport::Participant物件時會輸入包含host name與process id的名稱,ParticipantListener用于監聽網路的變化,網路拓撲發生變化時,Fast RTPS傳上來ParticipantDiscoveryInfo,在TopologyManager::Convert()函式中對該資訊轉換成Cyber RT中的資料結構ChangeMsg,然后呼叫回呼函式TopologyManager::OnParticipantChange(),它會呼叫其它幾個子管理器的OnTopoModuleLeave()函式,然后子管理器中便可以將相應維護的資訊進行更新(如NodeManager中將相應的節點洗掉),

這層拓撲監控主要是通過Fast RTPS提供的自動發現機制,如行程意外退出,則要將各管理中相應資訊進行更新,它的優點是如果行程出錯或設備斷開也可以作業,但粒度比較粗,且不是非常及時(比如斷開時),

  • 基于主動式的拓撲變更廣播

這一部分主要在TopologyManager::Init()函式中創建和初始化,在初始化時,會呼叫它們的StartDiscovery()函式開始啟動自動發現機制,基于TopologyManager中的RtpsParticipant物件,這幾個子管理會通過CreateSubscriber()CreatePublisher()函式創建相應的subscriber和publisher,子管理器中channel名稱分別為node_change_broadcastchannel_change_broadcastservice_change_broadcast,Subscriber的回呼函式為Manager::OnRemoteChange(),該回呼函式中會決議拓撲變更訊息并呼叫Dispose()函式進行處理,

這層拓撲監控是主動式的,即需要相應的地方主動呼叫Join()Leave()來觸發,然后各子管理器中回呼函式進行資訊的更新,如NodeChannelImpl創建時會呼叫NodeManager::Join()ReaderWriter初始化時會呼叫JoinTheTopolicy()函式,繼而呼叫ChannelManager::Join()函式,相應地,有LeaveTheTopology()函式表示退出拓撲網路,在這兩個函式中,會呼叫Dispose()函式,而這個函式是虛函式,在各子管理器中有各自的實作,另外Manager提供AddChangeListener()函式注冊當拓撲發生變化時的回呼函式,舉例來說,Reader::JoinTheTopology()函式中會通過該函式注冊回呼Reader::OnChannelChange()

資料傳輸

在一個分布式計算系統中,根據兩個節點間的位置關系需要使用不同的傳輸方式(定義在CommunicationMode中):
? - INTRA:如果是同行程的,因為在同一地址空間,直接傳指標就完了,
? - SHM(Shared memory):如果是同一機器上,但跨行程的,為了高效可以使用共享記憶體,
? - RTPS:如果是跨設備的,那就老老實實通過網路傳吧,

示意圖如下:
在這里插入圖片描述

很多時候一個計算圖中各種情況都有,所以為了達到最好的性能,需要混合使用,這種混合模式稱為HYBRID模式,框架需要根據節點間關系選擇合適的傳輸后端,

每個WriterTransmitter,每個ReaderReceiver,它們是負責訊息發送與收取的類,TransmitterReceiver的基類為Endpoint,代表一個通信的端點,它主要的資訊是身份標識與屬性,其型別為RoleAttributes(定義在role_attributes.proto)的成員attr_包含了host name,process id和一個根據uuid產生的hash值作為id,通過它們就可以判斷節點之間的相對位置關系了,

ReaderWriter會呼叫Transport的方法CreateTransmitter()CreateReceiver()用于創建發送端的transmitter和接收端的receiver,創建時有四種模式可選,分別是INTRA,SHM和RTPS,和HYBRID,最后一種是前三種的混合模式,也是默認的模式,如Transmitter對應的繼承類為IntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter,這幾個繼承類最主要實作了Transmit()函式用于傳輸資料,對于Receiver來說是類似的,它有4個繼承類對應四種傳輸方式,即IntraReceiverShmReceiverRtpsReceiverHybridReceiver

結合前面提到的幾種模式對應的場景,transmitter與receiver的對應關系如下:
在這里插入圖片描述

前面提到,傳輸層實作主要有四個實作后端,對應四種模式:

  • RTPS:RTPS部分基于eProsimar的Fast RTPS,RtpsTransmitter類中創建和封裝publisher,Transmit()函式將訊息序列化成Fast RTP中的格式UnderlayMessage,然后通過publisher發出去,RtpsReceiver中的dispatcher_成員指向單例RtpsDispatcher,它用于派發RTPS發來的資料,維護了channel id到subscriber的查找表,RtpsDispatcher::AddSubscriber()函式使用eprosima::fastrtps::Domain::createSubscriber()函式創建subscriber,其回呼統一為RtpsDispatcher::OnMessage()函式,該函式會將從RTPS通路來的訊息進行派發,

  • SHMSegment類表示一塊對應一個channel的共享記憶體,由SegmentFactory::CreateSegment函式創建,它有兩個繼承類PosixSegmentXsiSegment,是平臺相關的實作,在寫端,ShmTransmitter::Transmit()函式用于發送訊息,該函式先通過AcquireBlockToWrite()函式拿一個可寫的block,如果發現該Segment尚未初始化,會呼叫OpenOrCreate()通過OS的介面創建共享記憶體并且map出虛擬地址,這塊共享記憶體區域大體分兩部分,一部分為元資訊,另一部分為訊息資料,后者會被切分為相同大小的block,block的buffer大小默認16K,但遇上訊息超出大小的時候會調整,拿到該block后,將訊息序列化后寫入,并通知讀者來取訊息,通知機制是通過NotifierBase實作的,它有兩個實作類,分別為ConditionNotifierMulticastNotifier,前者為默認設定,它會單獨開一塊共享共享專門用于通知,其中包含了ReadableInfo等資訊,MulticastNotifier的主要區別是它是通過指定的socket廣播,在讀端,ShmDispatcher::Init()初始化時會創建專門的執行緒,執行緒的執行體為ShmDispatcher::Threadfunc()函式,它在回圈體內會通過Listen()函式等待新訊息,如果有新訊息寫入后發出通知,這兒就會往下走,基于通知中的ReadableInfo資訊,得到channel id,block index等資訊,然后呼叫ReadMessage()函式讀訊息并反序列化,之后呼叫ShmDispatcher::OnMessage()函式進行訊息派發,

  • INTRA :用于行程內通信,由于讀者和寫者是在同一行程內,因此可以直接呼叫,在IntraTransmitter::Transmit()函式中,會直接呼叫讀端的IntraDispatcher::OnMessage(),該函式進行下一步訊息的派發,

  • HYBRID:即默認模式,是前三種的結合體,具體功能其實還是交給前面幾個后端完成的,只是它會根據讀者與寫者的關系使用相應的后端,

訊息寫端

寫端的實作相對簡單一些,在模塊組件中,可以通過CreateWriter()函式創建Writer物件,然后就可以通過該物件向指定channel發送訊息了,以CameraComponent為例:

writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
...
auto pb_image = std::make_shared<Image>();                                 
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());      
pb_image->set_width(raw_image_->width);                                    
pb_image->set_height(raw_image_->height);                                  
pb_image->mutable_data()->reserve(raw_image_->image_size);       
...
writer_->Write(pb_image);

這里先創建了Writer物件,然后填好了訊息里的資料(這里發送的訊息型別為Image,定義在modules/drivers/proto/sensor_image.proto檔案),最后呼叫Writer::Write()函式將該訊息發出,

CreateWriter()函式中先創建Writer物件,再呼叫Writer::Init()函式進行初始化,初始化中主要通過CreateTransmitter()函式創建Transmitter物件,因為默認是HYBRID模式,所以這里實際創建的是HybridTransmitter物件,Transmitter繼承自Endpoint類,它其中的屬性資訊以用來判斷讀者與寫者的相對關系,不同的相對關系決定使用何種Transmitter物件,其配置在InitMode()函式中設定:

template <typename M>                                   
void HybridTransmitter<M>::InitMode() {                 
  mode_ = std::make_shared<proto::CommunicationMode>(); 
  mapping_table_[SAME_PROC] = mode_->same_proc();       
  mapping_table_[DIFF_PROC] = mode_->diff_proc();       
  mapping_table_[DIFF_HOST] = mode_->diff_host();       
}                                                       

Writer物件的初始化中還會將呼叫JointTheTopology()函式將之加入到ChannelManager維護的拓撲資訊中,

template <typename MessageT>                                              
void Writer<MessageT>::JoinTheTopology() {                                
  // add listener                                                         
  change_conn_ = channel_manager_->AddChangeListener(std::bind(           
      &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));  
                                                                          
  // get peer readers                                                     
  const std::string& channel_name = this->role_attr_.channel_name();      
  std::vector<proto::RoleAttributes> readers;                             
  channel_manager_->GetReadersOfChannel(channel_name, &readers);          
  for (auto& reader : readers) {                                          
    transmitter_->Enable(reader);                                         
  }                                                                       
                                                                          
  channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,  
                         message::HasSerializer<MessageT>::value);        
}                                                                         

這里還會做一件比較重要的事是enable相應的Transmitter,先通過ChannelManager得到該channel相應讀者的資訊,然后對于每個讀者,呼叫HybridTransmitter::Enable()函式,HybridTransmitter是混合模式的Transmitter,它其實包含了RTPS,SHM和INTRA三種Transmitter實體,但這三種Transmitter并不一定都需要用到,比如,如果該訊息對應的讀者全是同行程的,那就沒必要整上SHM和RTPS了,HybridTransmitter::Enable()函式會根據引數來enable合適的Transmitter

template <typename M>
void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {    
  auto relation = GetRelation(opposite_attr);
  if (relation == NO_RELATION) {
    return;
  }
  
  uint64_t id = opposite_attr.id();
  std::lock_guard<std::mutex> lock(mutex_);                                 
  receivers_[mapping_table_[relation]].insert(id);                          
  transmitters_[mapping_table_[relation]]->Enable();
  TransmitHistoryMsg(opposite_attr);                                        
}

相應地,在Disable()函式中決定是否要disable相應的Transmitter,這樣在之后的Transmit()函式中只要把transmitters_中的所有Transmitter拿出來呼叫Transmit()函式即可,

發送資料是通過Writer::Write()函式繼而呼叫Transmitter::Transmit()函式來實作的,因為這里是用的HybridTransmitter,因此實際呼叫的是HybridTransmitter::Transmit()函式:

template <typename M>
bool HybridTransmitter<M>::Transmit(const MessagePtr& msg,
                                    const MessageInfo& msg_info) {
  std::lock_guard<std::mutex> lock(mutex_);
  history_->Add(msg, msg_info);
  for (auto& item : transmitters_) {
    item.second->Transmit(msg, msg_info);
  }
  return true;
} 

可以看到這里分別呼叫三大TransmitterTransmit()函式發送訊息,

訊息讀端

讀端的處理鏈路相比下復雜一些,先回顧一個Component中對訊息的處理,對于一個Component來說,它可能會從多個channel收取訊息,然后基于所有channel的訊息才能處理,第一個channel暫且稱之為主channel,這些channel訊息的組合我們暫且稱為組合訊息,我們就來看下典型的兩個channel情況,其初始化的主要代碼為:

template <typename M0, typename M1>                      
bool Component<M0, M1, NullType, NullType>::Initialize(  
  ...
  ReaderConfig reader_cfg;                                                       
  reader_cfg.channel_name = config.readers(1).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();        
                                                                                 
  auto reader1 = node_->template CreateReader<M1>(reader_cfg);                   
                                                                                 
  reader_cfg.channel_name = config.readers(0).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();        
                                                                                 
  reader0 = node_->template CreateReader<M0>(reader_cfg);         
  ...
  readers_.push_back(std::move(reader0)); 
  readers_.push_back(std::move(reader1)); 
  ...  
  std::vector<data::VisitorConfig> config_list;                                   
  for (auto& reader : readers_) {                                                 
    config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());    
  }                                                                               
  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);             
  croutine::RoutineFactory factory =                                              
      croutine::CreateRoutineFactory<M0, M1>(func, dv);                           
  return sched->CreateTask(factory, node_->Name());                               
}

其中對兩個channel分別創建Reader物件,該Reader物件是針對單個channel的,然后針對所有channel創建DataVisitor物件,這時就是針對所有channel的組合訊息了,最后創建協程來進行組合資料的處理,后面會看到每個Reader都會有單獨的協程來做資料讀取,因此,對于一個有n個channel的component,框架會為此創建至少n+1個協程,

其中比較重要的結構就是用于讀取訊息的Reader類了,我們先看Reader物件的創建,其初始化函式Init()如下:

template <typename MessageT>                                                         
bool Reader<MessageT>::Init() {                                                      
  if (init_.exchange(true)) {                                                        
    return true;                                                                     
  }                                                                                  
  std::function<void(const std::shared_ptr<MessageT>&)> func;                        
  if (reader_func_ != nullptr) {                                                     
    func = [this](const std::shared_ptr<MessageT>& msg) {                            
      this->Enqueue(msg);                                                            
      this->reader_func_(msg);                                                       
    };                                                                               
  } else {                                                                           
    func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };     
  }                                                                                  
  auto sched = scheduler::Instance();                                                
  croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();         
  auto dv = std::make_shared<data::DataVisitor<MessageT>>(                           
      role_attr_.channel_id(), pending_queue_size_);                                 
  // Using factory to wrap templates.                                                
  croutine::RoutineFactory factory =                                                 
      croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);                 
  if (!sched->CreateTask(factory, croutine_name_)) {                                 
    AERROR << "Create Task Failed!";                                                 
    init_.store(false);                                                              
    return false;                                                                    
  }                                                                                  
                                                                                     
  receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);        
  this->role_attr_.set_id(receiver_->id().HashValue());                              
  channel_manager_ =                                                                 
      service_discovery::TopologyManager::Instance()->channel_manager();             
  JoinTheTopology();                                                                 
                                                                                     
  return true;                                                                       
}                                                                                    

這里主要創建了相應的DataVisitor類,協程和Receiver類等,其中DataVisitor主要用于訊息資料的訪問,它存放到來的訊息資料,并提供介面供訊息讀取,還是以兩個channel的情況為例:

template <typename M0, typename M1>                                                  
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {             
 public:                                                                             
  explicit DataVisitor(const std::vector<VisitorConfig>& configs)                    
      : buffer_m0_(configs[0].channel_id,                                            
                   new BufferType<M0>(configs[0].queue_size)),                       
        buffer_m1_(configs[1].channel_id,                                            
                   new BufferType<M1>(configs[1].queue_size)) {                      
    DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);                           
    DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);                           
    data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);                 
    data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);            
  }   
  ... 
  bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {  // NOLINT            
    if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {                                 
      next_msg_index_++;                                                                  
      return true;                                                                        
    }                                                                                     
    return false;                                                                         
  }                                                                                       
                                                                                          
 private:                                                                                 
  fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;                                     
  ChannelBuffer<M0> buffer_m0_;                                                           
  ChannelBuffer<M1> buffer_m1_;                                                           
};                                                                                        

它的成員變數中對每一個channel都有一個對應的ChannelBuffer物件,DataDispatcher::AddBuffer()函式在DataVisitor初始化時用來將這些個ChannelBuffer加入到DataDispatcher的管理中,同時,DataNotifier::AddNotifier()函式用來以主channel的id為鍵值加入到DataNotifier的管理中,DataDispatcherDataNotifier均為單例,前者為模板類,意味著每一個訊息型別會有對應的DataDispatcher物件,且相同訊息型別會共享該物件,顧名思義,它主要用于資料傳輸層有資料來時的分發,即當新訊息到來時通過DataDispatcher::Dispatch()函式把它放到相應的訊息緩沖區中,后者用于管理所有的Notifier,它用于在訊息派發完后喚醒相應的協程進行處理,這些物件的大體結構圖如下:
在這里插入圖片描述

當channel多于一個時(組合訊息),DataVisitor中還有一個DataFusion物件用于將多路channel的資料合并,DataFusion的實作類為AllLatest,聽名字就知道它會取所有channel中的最新值,除了per-channel的ChannelBuffer物件外,它還有一個特殊的ChannelBuffer物件用于存放多channel訊息的組合訊息(即各個channel的訊息型別的tuple),當填入主channel的訊息時,會呼叫由SetFusionCallback()函式注冊的回呼,該回呼判斷是否所有channel都有訊息,如果都有訊息的話就將這些訊息作為組合訊息填入該組合訊息的ChannelBuffer中, 在協程處理函式中會呼叫DataVisitor::TryFetch()函式從該ChannelBuffer中拿組合訊息,值得注意的是這件事只在主channel有訊息來時才會被觸發,因此主channel的選取是有講究的,

Reader初始化時創建的另一個關鍵物件為Receiver,它有4個繼承類,默認為混合模式的HybridReceiverHybridReceiver::InitReceivers中分別創建相應的IntraReceiverShmReceiverRtpsReceiver,放在成員receivers_陣列中,它會來根據寫端的情況來enable和disable相應的ReceiverReceiverManager用于管理這些Receiver物件,它以channel為key進行管理,因此同一行程內訂閱同一個channel的會共用同一個Receiver物件,ReceiverManager::GetReceiver()函式用于按鍵值取出Receiver,如沒有,則通過Transport::CreateReceiver()函式新建一個Receiver, 這些個ReceiverEnable()函式中會通過AddListener()函式向對應的Dispatcher注冊其回呼函式XXXReceiver::OnNewMessage()Dispatcher類中的成員msg_listeners_是channel id到ListenerHandler物件的查找表,ListenerHandler通過signal/slot機制保存了所有這些回呼,注意不同傳輸后端的AddListener()實作略有不同,比如RtpsDispatcher::AddListener()函式中會將輸入的訊息先通過ParseFromString()函式進行決議,然后呼叫傳入的回呼,ShmDispatcher::AddListener()函式也是類似,它會先通過ParseFromArray()函式決議訊息,而對于IntraDispatcher::AddListener(),由于是同個行程內,是以訊息本身的型別傳的,就沒必要決議了,

這些相關結構關系示意圖如下:
在這里插入圖片描述
看了一些關鍵相關資料結構,接下來看下讀端的處理流程,首先,如之前介紹的,各Dispatcher的繼承類各顯神通使自己的OnMessage()回呼函式被呼叫,以RtpsDispatcher為例:

void RtpsDispatcher::OnMessage(uint64_t channel_id,
                               const std::shared_ptr<std::string>& msg_str,
                               const MessageInfo& msg_info) {
  if (is_shutdown_.load()) {
    return;
  }
    
  ListenerHandlerBasePtr* handler_base = nullptr;
  if (msg_listeners_.Get(channel_id, &handler_base)) {
    auto handler =
        std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
    handler->Run(msg_str, msg_info);
  }
}

這里ListenerHandler::Run()會根據訊息的發送者資訊找到對應的回呼,即Receiver::OnNewMessage()

template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
                               const MessageInfo& msg_info) {
  if (msg_listener_ != nullptr) {
    msg_listener_(msg, msg_info, attr_);
  }
}

這里的回呼函式msg_listener_是在Receiver創建的時候傳入的,其實主要是呼叫了DataDispatcher::Dispatch()函式來訊息的派發:

transport::Transport::Instance()->CreateReceiver<MessageT>(      
    role_attr, [](const std::shared_ptr<MessageT>& msg,          
                  const transport::MessageInfo& msg_info,        
                  const proto::RoleAttributes& reader_attr) {    
      (void)msg_info;                                            
      (void)reader_attr;                                         
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::DISPATCH, reader_attr.channel_id(),         
          msg_info.seq_num());                                   
      data::DataDispatcher<MessageT>::Instance()->Dispatch(      
          reader_attr.channel_id(), msg);                        
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::NOTIFY, reader_attr.channel_id(),           
          msg_info.seq_num());                                   
    });                                                                                                                       

DataDisaptcher是模板類單例,即對于一種特定型別的訊息可以共用一個DataDispatcher,之前在DataVisitor初始化時會通過AddBuffer()函式將ChannelBuffer加入到DataDispatcher的成員buffers_map_中,它是一個以channel id為key的map,其value為所有等待該channel上訊息的CacheBuffer的陣列,也就是說,訊息分發時,只需要根據channel id找到這些buffer,然后將新來的訊息填入其中即可,這就是Dispatcher::Dispatch()函式主要做的事:

template <typename T>                                                 
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,           
                                 const std::shared_ptr<T>& msg) {     
  BufferVector* buffers = nullptr;                                    
  if (apollo::cyber::IsShutdown()) {                                  
    return false;                                                     
  }                                                                   
  if (buffers_map_.Get(channel_id, &buffers)) {                       
    for (auto& buffer_wptr : *buffers) {                              
      if (auto buffer = buffer_wptr.lock()) {                         
        std::lock_guard<std::mutex> lock(buffer->Mutex());            
        buffer->Fill(msg);                                            
      }                                                               
    }                                                                 
  } else {                                                            
    return false;                                                     
  }                                                                   
  return notifier_->Notify(channel_id);                               
}                                                                     

最后呼叫DataNotifier::Notify()函式來通知新訊息的到來,它會觸發該channel上所有對應Notifier中的回呼,

inline bool DataNotifier::Notify(const uint64_t channel_id) {  
  NotifyVector* notifies = nullptr;                            
  if (notifies_map_.Get(channel_id, &notifies)) {              
    for (auto& notifier : *notifies) {                         
      if (notifier && notifier->callback) {                    
        notifier->callback();                                  
      }                                                        
    }                                                          
    return true;                                               
  }                                                            
  return false;                                                
}                                                              

這個Notifier中的回呼是在創建協程時通過RegisterNotifyCallback()函式注冊進去的,目的是為了喚醒相應的協程來處理該新訊息,

visitor->RegisterNotifyCallback([this, task_id]() { 
  if (cyber_unlikely(stop_.load())) {               
    return;                                         
  }                                                 
  this->NotifyProcessor(task_id);                   
});                                                 

NotifyProcessor()函式會修改對應協程的狀態使之能被調度執行,前面提到,對于n個channel輸入的component,會有n+1個協程,它們都是以DataVisitor和訊息回呼函式一起作為引數創建的,這個協程主體中會呼叫DataVisitor::TryFetch()函式拿訊息,然后呼叫注冊的訊息處理函式:

  factory.create_routine = [=]() {                                           
    return [=]() {                                                           
      std::shared_ptr<M0> msg;                                               
      for (;;) {                                                             
        CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);   
        if (dv->TryFetch(msg)) {                                             
          f(msg);                                                            
          CRoutine::Yield(RoutineState::READY);                              
        } else {                                                             
          CRoutine::Yield();                                                 
        }                                                                    
      }                                                                      
    };                                                                       
  };                                                                         

對于那n個訊息讀取協程來說,其訊息處理函式為:

func = [this](const std::shared_ptr<MessageT>& msg) {    
  this->Enqueue(msg);                                    
  this->reader_func_(msg);                               
};                                                       

這個回呼函式中會呼叫Reader::Enqueue()函式,在該函式中,主要呼叫Blocker::Publish()函式,它繼而呼叫Blocker::Enqueue()Blocker::Notify()函式,Blocker類是一個存盤訊息的結構,BlockerManager類用于管理Blocker,其中維護了以channel為鍵值的Blocker的map,Reader::Enqueue()函式將訊息放到Blocker的成員published_msg_queue_佇列中,之后,可以通過Blocker::Observe()函式將成員published_msg_queue_佇列的訊息放到成員observed_msg_queue_佇列,然后通過Blocker::GetLatestObserved()函式得到最新的訊息,比如ControlComponent中的:

chassis_reader_->Observe();
const auto &chassis_msg = chassis_reader_->GetLatestObserved();

而對于剩下那一個協程,它是由主channel來觸發的,因它處理的是多channel的組合訊息,在協程主體中的TryFetch()函式會呼叫AllLatest::Fusion()函式同時拿多個channel上的最新訊息,至于這個組合訊息是怎么填入的前面有提,簡單來說,對于它來說,主channel來訊息時,同時也會將其它channel的訊息寫入組合訊息,然后調度協程,拿出組合訊息進行處理,其訊息處理函式為:

 auto func = [self](const std::shared_ptr<M0>& msg0,         
                    const std::shared_ptr<M1>& msg1) {       
   auto ptr = self.lock();                                   
   if (ptr) {                                                
     ptr->Process(msg0, msg1);                               
   } else {                                                  
     AERROR << "Component object has been destroyed.";       
   }                                                         
 };                                                          

該實作中主要以收到的訊息為引數呼叫Component中的處理函式Process(),從而執行組件的自定義處理邏輯,

小結

文中提了不少細枝末節,最后非常high-level地概括下從寫者到讀者的流程,寫者Writer寫訊息時,會通過HybridTransmitter繼而使用合適后端的Transmitter發送訊息,根據讀與寫者間的位置關系,經過網路、共享記憶體或直接呼叫的方式,對應后端的Dispatcher收到訊息,收到后轉成指定訊息型別,交給Receiver,然后通過DataDispatcher派發訊息,派發訊息就是將訊息放到對應的buffer中,然后通知相應的協程來作進一步處理,上層模塊要取用這些訊息,主要兩種方式:一種是通過ComponentProc()介面,它被呼叫時引數就是最新的訊息,另一種是通過ReaderObserve()函式直接拿,
在這里插入圖片描述

我們知道,Apollo在版本3.5前是基于ROS的,同時也對ROS做了幾個重要改進,這些改進不少是關于通信系統的,如共享記憶體、去中心化和資料兼容性,到Cyber RT的演進也自然延續了這幾個優點,總得來說,Cyber RT基于自動發現機制與Publish-Subscribe模式實作了通信網路的拓撲管理,同時它對資料傳輸層做了抽象,下面實作多個后端分別適用于不同場景,并提供了HYBRID模式可以根據讀者和寫者間的關系自動使用合適的傳輸層后端,這樣,通信系統的復雜性就被很好地屏蔽,框架就能提供給應用層便利的開發介面,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/34710.html

標籤:其他

上一篇:高煥堂《嵌入式UML設計》讀書筆記_第一章

下一篇:到了2020年,技術水平到底需要達到怎樣的程度才能成為頂級的阿里P8架構師

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more