前言
計算框架是自動駕駛系統中的重中之重,也是整個系統得以高效穩定運行的基礎,為了實時地完成感知、決策和執行,系統需要一系列的模塊相互緊密配合,高效地執行任務流,由于各種原因,這些模塊可能位于不同行程,也可能位于不同機器,這就要求計算框架中具有靈活的、高性能的通信機制,我們知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS,之前寫過兩篇相關的文章介紹了其中的調度部分:《自動駕駛平臺Apollo 3.5閱讀手記:Cyber RT中的協程(Coroutine)》和《自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的任務調度》,今天就來聊一下其中的另一重要部分-通信系統,
和ROS & ROS2中類似,Cyber RT中支持兩種資料交換模式:一種是Publish-Subscribe模式,常用于資料流處理中節點間通信,即發布者(Publisher)在channel(ROS中對應地稱為topic)上發布訊息,訂閱該channel的訂閱者(Subscriber)便會收到訊息資料;另一種就是常見的Service-Client模式,常用于客戶端與服務端的請求與回應,本質上它是可以基于前者實作的,Node是整個資料拓撲網路中的基本單元,一個Node中可以創建多個讀者/寫者,服務端/客戶端,讀者和寫者分別對應Reader和Writer,用于Publish-Subscribe模式,服務端和客戶端分別對應Service和Client,用于Service-Client模式,
實作決議
自動駕駛系統中的各個處理模塊基本都是實作為Component,一個Component中包含一個Node,另外會根據需要創建和管理Writer,Reader,Service和Client,這些用于通信的類下面基于Trasmitter和Receiver類,前者用于資料發送,后者用于資料接收,它們是資料傳輸層的抽象,之下可有多個傳輸層實作用于不同場景下的傳輸,如對于Trasmitter有IntraTransmitter,ShmTransmitter,RtpsTransmitter和HybridTransmitter,對于Receiver也是類似的,其中RTPS后端基于Fast RTPS,Fast RTPS是DDS(Data Distribution Service)標準的一個非常流行的開源實作,DDS標準提供了一個平臺無關的資料模型,主要用于實時分布式系統,不同的實作可以相互通信,整個通信系統的架構層次圖如下,

下面我們就從幾個方面深入地看下它們的實作機制,
服務發現與拓撲管理
首先來看下比較基礎與核心的服務發現與拓撲管理,其實作主要在目錄cyber/service_discovery/下,節點間通過讀和寫端建立資料通路,以channel為邊,這樣可以得到一個資料流圖絡,由于節點可能退出,訂閱情況也可能發生改變,所以這個網路是動態的,因此需要對網路拓撲進行監控,
主要負責這件事的資料結構是TopologyManager,它是個單例,因為每個行程只要有一個來負責監控網路拓撲就可以了,TopologyManager有三個子管理器,并有共同的基類Manager,它們分別為:
? - NodeManager用于管理網路拓撲中的節點,
? - ChannelManager用于管理channel,即網路拓撲中的邊,
? - ServiceManager用于管理Service和Client,
Cyber RT中有兩個層面的拓撲變化的監控:
- 基于Fast RTPS的發現機制
它主要監視網路中是否有參與者加入或退出,TopologyManager::CreateParticipant()函式創建transport::Participant物件時會輸入包含host name與process id的名稱,ParticipantListener用于監聽網路的變化,網路拓撲發生變化時,Fast RTPS傳上來ParticipantDiscoveryInfo,在TopologyManager::Convert()函式中對該資訊轉換成Cyber RT中的資料結構ChangeMsg,然后呼叫回呼函式TopologyManager::OnParticipantChange(),它會呼叫其它幾個子管理器的OnTopoModuleLeave()函式,然后子管理器中便可以將相應維護的資訊進行更新(如NodeManager中將相應的節點洗掉),
這層拓撲監控主要是通過Fast RTPS提供的自動發現機制,如行程意外退出,則要將各管理中相應資訊進行更新,它的優點是如果行程出錯或設備斷開也可以作業,但粒度比較粗,且不是非常及時(比如斷開時),
- 基于主動式的拓撲變更廣播
這一部分主要在TopologyManager::Init()函式中創建和初始化,在初始化時,會呼叫它們的StartDiscovery()函式開始啟動自動發現機制,基于TopologyManager中的RtpsParticipant物件,這幾個子管理會通過CreateSubscriber()和CreatePublisher()函式創建相應的subscriber和publisher,子管理器中channel名稱分別為node_change_broadcast,channel_change_broadcast和service_change_broadcast,Subscriber的回呼函式為Manager::OnRemoteChange(),該回呼函式中會決議拓撲變更訊息并呼叫Dispose()函式進行處理,
這層拓撲監控是主動式的,即需要相應的地方主動呼叫Join()或Leave()來觸發,然后各子管理器中回呼函式進行資訊的更新,如NodeChannelImpl創建時會呼叫NodeManager::Join(),Reader和Writer初始化時會呼叫JoinTheTopolicy()函式,繼而呼叫ChannelManager::Join()函式,相應地,有LeaveTheTopology()函式表示退出拓撲網路,在這兩個函式中,會呼叫Dispose()函式,而這個函式是虛函式,在各子管理器中有各自的實作,另外Manager提供AddChangeListener()函式注冊當拓撲發生變化時的回呼函式,舉例來說,Reader::JoinTheTopology()函式中會通過該函式注冊回呼Reader::OnChannelChange(),
資料傳輸
在一個分布式計算系統中,根據兩個節點間的位置關系需要使用不同的傳輸方式(定義在CommunicationMode中):
? - INTRA:如果是同行程的,因為在同一地址空間,直接傳指標就完了,
? - SHM(Shared memory):如果是同一機器上,但跨行程的,為了高效可以使用共享記憶體,
? - RTPS:如果是跨設備的,那就老老實實通過網路傳吧,
示意圖如下:

很多時候一個計算圖中各種情況都有,所以為了達到最好的性能,需要混合使用,這種混合模式稱為HYBRID模式,框架需要根據節點間關系選擇合適的傳輸后端,
每個Writer有Transmitter,每個Reader有Receiver,它們是負責訊息發送與收取的類,Transmitter與Receiver的基類為Endpoint,代表一個通信的端點,它主要的資訊是身份標識與屬性,其型別為RoleAttributes(定義在role_attributes.proto)的成員attr_包含了host name,process id和一個根據uuid產生的hash值作為id,通過它們就可以判斷節點之間的相對位置關系了,
Reader和Writer會呼叫Transport的方法CreateTransmitter()和CreateReceiver()用于創建發送端的transmitter和接收端的receiver,創建時有四種模式可選,分別是INTRA,SHM和RTPS,和HYBRID,最后一種是前三種的混合模式,也是默認的模式,如Transmitter對應的繼承類為IntraTransmitter,ShmTransmitter,RtpsTransmitter和HybridTransmitter,這幾個繼承類最主要實作了Transmit()函式用于傳輸資料,對于Receiver來說是類似的,它有4個繼承類對應四種傳輸方式,即IntraReceiver,ShmReceiver,RtpsReceiver和HybridReceiver,
結合前面提到的幾種模式對應的場景,transmitter與receiver的對應關系如下:

前面提到,傳輸層實作主要有四個實作后端,對應四種模式:
-
RTPS:RTPS部分基于eProsimar的Fast RTPS,
RtpsTransmitter類中創建和封裝publisher,Transmit()函式將訊息序列化成Fast RTP中的格式UnderlayMessage,然后通過publisher發出去,RtpsReceiver中的dispatcher_成員指向單例RtpsDispatcher,它用于派發RTPS發來的資料,維護了channel id到subscriber的查找表,RtpsDispatcher::AddSubscriber()函式使用eprosima::fastrtps::Domain::createSubscriber()函式創建subscriber,其回呼統一為RtpsDispatcher::OnMessage()函式,該函式會將從RTPS通路來的訊息進行派發, -
SHM:
Segment類表示一塊對應一個channel的共享記憶體,由SegmentFactory::CreateSegment函式創建,它有兩個繼承類PosixSegment和XsiSegment,是平臺相關的實作,在寫端,ShmTransmitter::Transmit()函式用于發送訊息,該函式先通過AcquireBlockToWrite()函式拿一個可寫的block,如果發現該Segment尚未初始化,會呼叫OpenOrCreate()通過OS的介面創建共享記憶體并且map出虛擬地址,這塊共享記憶體區域大體分兩部分,一部分為元資訊,另一部分為訊息資料,后者會被切分為相同大小的block,block的buffer大小默認16K,但遇上訊息超出大小的時候會調整,拿到該block后,將訊息序列化后寫入,并通知讀者來取訊息,通知機制是通過NotifierBase實作的,它有兩個實作類,分別為ConditionNotifier和MulticastNotifier,前者為默認設定,它會單獨開一塊共享共享專門用于通知,其中包含了ReadableInfo等資訊,MulticastNotifier的主要區別是它是通過指定的socket廣播,在讀端,ShmDispatcher::Init()初始化時會創建專門的執行緒,執行緒的執行體為ShmDispatcher::Threadfunc()函式,它在回圈體內會通過Listen()函式等待新訊息,如果有新訊息寫入后發出通知,這兒就會往下走,基于通知中的ReadableInfo資訊,得到channel id,block index等資訊,然后呼叫ReadMessage()函式讀訊息并反序列化,之后呼叫ShmDispatcher::OnMessage()函式進行訊息派發, -
INTRA :用于行程內通信,由于讀者和寫者是在同一行程內,因此可以直接呼叫,在
IntraTransmitter::Transmit()函式中,會直接呼叫讀端的IntraDispatcher::OnMessage(),該函式進行下一步訊息的派發, -
HYBRID:即默認模式,是前三種的結合體,具體功能其實還是交給前面幾個后端完成的,只是它會根據讀者與寫者的關系使用相應的后端,
訊息寫端
寫端的實作相對簡單一些,在模塊組件中,可以通過CreateWriter()函式創建Writer物件,然后就可以通過該物件向指定channel發送訊息了,以CameraComponent為例:
writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
...
auto pb_image = std::make_shared<Image>();
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());
pb_image->set_width(raw_image_->width);
pb_image->set_height(raw_image_->height);
pb_image->mutable_data()->reserve(raw_image_->image_size);
...
writer_->Write(pb_image);
這里先創建了Writer物件,然后填好了訊息里的資料(這里發送的訊息型別為Image,定義在modules/drivers/proto/sensor_image.proto檔案),最后呼叫Writer::Write()函式將該訊息發出,
CreateWriter()函式中先創建Writer物件,再呼叫Writer::Init()函式進行初始化,初始化中主要通過CreateTransmitter()函式創建Transmitter物件,因為默認是HYBRID模式,所以這里實際創建的是HybridTransmitter物件,Transmitter繼承自Endpoint類,它其中的屬性資訊以用來判斷讀者與寫者的相對關系,不同的相對關系決定使用何種Transmitter物件,其配置在InitMode()函式中設定:
template <typename M>
void HybridTransmitter<M>::InitMode() {
mode_ = std::make_shared<proto::CommunicationMode>();
mapping_table_[SAME_PROC] = mode_->same_proc();
mapping_table_[DIFF_PROC] = mode_->diff_proc();
mapping_table_[DIFF_HOST] = mode_->diff_host();
}
Writer物件的初始化中還會將呼叫JointTheTopology()函式將之加入到ChannelManager維護的拓撲資訊中,
template <typename MessageT>
void Writer<MessageT>::JoinTheTopology() {
// add listener
change_conn_ = channel_manager_->AddChangeListener(std::bind(
&Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));
// get peer readers
const std::string& channel_name = this->role_attr_.channel_name();
std::vector<proto::RoleAttributes> readers;
channel_manager_->GetReadersOfChannel(channel_name, &readers);
for (auto& reader : readers) {
transmitter_->Enable(reader);
}
channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,
message::HasSerializer<MessageT>::value);
}
這里還會做一件比較重要的事是enable相應的Transmitter,先通過ChannelManager得到該channel相應讀者的資訊,然后對于每個讀者,呼叫HybridTransmitter::Enable()函式,HybridTransmitter是混合模式的Transmitter,它其實包含了RTPS,SHM和INTRA三種Transmitter實體,但這三種Transmitter并不一定都需要用到,比如,如果該訊息對應的讀者全是同行程的,那就沒必要整上SHM和RTPS了,HybridTransmitter::Enable()函式會根據引數來enable合適的Transmitter,
template <typename M>
void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {
auto relation = GetRelation(opposite_attr);
if (relation == NO_RELATION) {
return;
}
uint64_t id = opposite_attr.id();
std::lock_guard<std::mutex> lock(mutex_);
receivers_[mapping_table_[relation]].insert(id);
transmitters_[mapping_table_[relation]]->Enable();
TransmitHistoryMsg(opposite_attr);
}
相應地,在Disable()函式中決定是否要disable相應的Transmitter,這樣在之后的Transmit()函式中只要把transmitters_中的所有Transmitter拿出來呼叫Transmit()函式即可,
發送資料是通過Writer::Write()函式繼而呼叫Transmitter::Transmit()函式來實作的,因為這里是用的HybridTransmitter,因此實際呼叫的是HybridTransmitter::Transmit()函式:
template <typename M>
bool HybridTransmitter<M>::Transmit(const MessagePtr& msg,
const MessageInfo& msg_info) {
std::lock_guard<std::mutex> lock(mutex_);
history_->Add(msg, msg_info);
for (auto& item : transmitters_) {
item.second->Transmit(msg, msg_info);
}
return true;
}
可以看到這里分別呼叫三大Transmitter的Transmit()函式發送訊息,
訊息讀端
讀端的處理鏈路相比下復雜一些,先回顧一個Component中對訊息的處理,對于一個Component來說,它可能會從多個channel收取訊息,然后基于所有channel的訊息才能處理,第一個channel暫且稱之為主channel,這些channel訊息的組合我們暫且稱為組合訊息,我們就來看下典型的兩個channel情況,其初始化的主要代碼為:
template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
...
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
auto reader1 = node_->template CreateReader<M1>(reader_cfg);
reader_cfg.channel_name = config.readers(0).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
reader0 = node_->template CreateReader<M0>(reader_cfg);
...
readers_.push_back(std::move(reader0));
readers_.push_back(std::move(reader1));
...
std::vector<data::VisitorConfig> config_list;
for (auto& reader : readers_) {
config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
}
auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<M0, M1>(func, dv);
return sched->CreateTask(factory, node_->Name());
}
其中對兩個channel分別創建Reader物件,該Reader物件是針對單個channel的,然后針對所有channel創建DataVisitor物件,這時就是針對所有channel的組合訊息了,最后創建協程來進行組合資料的處理,后面會看到每個Reader都會有單獨的協程來做資料讀取,因此,對于一個有n個channel的component,框架會為此創建至少n+1個協程,
其中比較重要的結構就是用于讀取訊息的Reader類了,我們先看Reader物件的創建,其初始化函式Init()如下:
template <typename MessageT>
bool Reader<MessageT>::Init() {
if (init_.exchange(true)) {
return true;
}
std::function<void(const std::shared_ptr<MessageT>&)> func;
if (reader_func_ != nullptr) {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
} else {
func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };
}
auto sched = scheduler::Instance();
croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
auto dv = std::make_shared<data::DataVisitor<MessageT>>(
role_attr_.channel_id(), pending_queue_size_);
// Using factory to wrap templates.
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
if (!sched->CreateTask(factory, croutine_name_)) {
AERROR << "Create Task Failed!";
init_.store(false);
return false;
}
receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
this->role_attr_.set_id(receiver_->id().HashValue());
channel_manager_ =
service_discovery::TopologyManager::Instance()->channel_manager();
JoinTheTopology();
return true;
}
這里主要創建了相應的DataVisitor類,協程和Receiver類等,其中DataVisitor主要用于訊息資料的訪問,它存放到來的訊息資料,并提供介面供訊息讀取,還是以兩個channel的情況為例:
template <typename M0, typename M1>
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {
public:
explicit DataVisitor(const std::vector<VisitorConfig>& configs)
: buffer_m0_(configs[0].channel_id,
new BufferType<M0>(configs[0].queue_size)),
buffer_m1_(configs[1].channel_id,
new BufferType<M1>(configs[1].queue_size)) {
DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);
DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);
data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);
data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);
}
...
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) { // NOLINT
if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {
next_msg_index_++;
return true;
}
return false;
}
private:
fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
};
它的成員變數中對每一個channel都有一個對應的ChannelBuffer物件,DataDispatcher::AddBuffer()函式在DataVisitor初始化時用來將這些個ChannelBuffer加入到DataDispatcher的管理中,同時,DataNotifier::AddNotifier()函式用來以主channel的id為鍵值加入到DataNotifier的管理中,DataDispatcher與DataNotifier均為單例,前者為模板類,意味著每一個訊息型別會有對應的DataDispatcher物件,且相同訊息型別會共享該物件,顧名思義,它主要用于資料傳輸層有資料來時的分發,即當新訊息到來時通過DataDispatcher::Dispatch()函式把它放到相應的訊息緩沖區中,后者用于管理所有的Notifier,它用于在訊息派發完后喚醒相應的協程進行處理,這些物件的大體結構圖如下:

當channel多于一個時(組合訊息),DataVisitor中還有一個DataFusion物件用于將多路channel的資料合并,DataFusion的實作類為AllLatest,聽名字就知道它會取所有channel中的最新值,除了per-channel的ChannelBuffer物件外,它還有一個特殊的ChannelBuffer物件用于存放多channel訊息的組合訊息(即各個channel的訊息型別的tuple),當填入主channel的訊息時,會呼叫由SetFusionCallback()函式注冊的回呼,該回呼判斷是否所有channel都有訊息,如果都有訊息的話就將這些訊息作為組合訊息填入該組合訊息的ChannelBuffer中, 在協程處理函式中會呼叫DataVisitor::TryFetch()函式從該ChannelBuffer中拿組合訊息,值得注意的是這件事只在主channel有訊息來時才會被觸發,因此主channel的選取是有講究的,
Reader初始化時創建的另一個關鍵物件為Receiver,它有4個繼承類,默認為混合模式的HybridReceiver,HybridReceiver::InitReceivers中分別創建相應的IntraReceiver、ShmReceiver和RtpsReceiver,放在成員receivers_陣列中,它會來根據寫端的情況來enable和disable相應的Receiver,ReceiverManager用于管理這些Receiver物件,它以channel為key進行管理,因此同一行程內訂閱同一個channel的會共用同一個Receiver物件,ReceiverManager::GetReceiver()函式用于按鍵值取出Receiver,如沒有,則通過Transport::CreateReceiver()函式新建一個Receiver, 這些個Receiver在Enable()函式中會通過AddListener()函式向對應的Dispatcher注冊其回呼函式XXXReceiver::OnNewMessage(),Dispatcher類中的成員msg_listeners_是channel id到ListenerHandler物件的查找表,ListenerHandler通過signal/slot機制保存了所有這些回呼,注意不同傳輸后端的AddListener()實作略有不同,比如RtpsDispatcher::AddListener()函式中會將輸入的訊息先通過ParseFromString()函式進行決議,然后呼叫傳入的回呼,ShmDispatcher::AddListener()函式也是類似,它會先通過ParseFromArray()函式決議訊息,而對于IntraDispatcher::AddListener(),由于是同個行程內,是以訊息本身的型別傳的,就沒必要決議了,
這些相關結構關系示意圖如下:

看了一些關鍵相關資料結構,接下來看下讀端的處理流程,首先,如之前介紹的,各Dispatcher的繼承類各顯神通使自己的OnMessage()回呼函式被呼叫,以RtpsDispatcher為例:
void RtpsDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler =
std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
handler->Run(msg_str, msg_info);
}
}
這里ListenerHandler::Run()會根據訊息的發送者資訊找到對應的回呼,即Receiver::OnNewMessage(),
template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
const MessageInfo& msg_info) {
if (msg_listener_ != nullptr) {
msg_listener_(msg, msg_info, attr_);
}
}
這里的回呼函式msg_listener_是在Receiver創建的時候傳入的,其實主要是呼叫了DataDispatcher::Dispatch()函式來訊息的派發:
transport::Transport::Instance()->CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::DISPATCH, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});
DataDisaptcher是模板類單例,即對于一種特定型別的訊息可以共用一個DataDispatcher,之前在DataVisitor初始化時會通過AddBuffer()函式將ChannelBuffer加入到DataDispatcher的成員buffers_map_中,它是一個以channel id為key的map,其value為所有等待該channel上訊息的CacheBuffer的陣列,也就是說,訊息分發時,只需要根據channel id找到這些buffer,然后將新來的訊息填入其中即可,這就是Dispatcher::Dispatch()函式主要做的事:
template <typename T>
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,
const std::shared_ptr<T>& msg) {
BufferVector* buffers = nullptr;
if (apollo::cyber::IsShutdown()) {
return false;
}
if (buffers_map_.Get(channel_id, &buffers)) {
for (auto& buffer_wptr : *buffers) {
if (auto buffer = buffer_wptr.lock()) {
std::lock_guard<std::mutex> lock(buffer->Mutex());
buffer->Fill(msg);
}
}
} else {
return false;
}
return notifier_->Notify(channel_id);
}
最后呼叫DataNotifier::Notify()函式來通知新訊息的到來,它會觸發該channel上所有對應Notifier中的回呼,
inline bool DataNotifier::Notify(const uint64_t channel_id) {
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, ¬ifies)) {
for (auto& notifier : *notifies) {
if (notifier && notifier->callback) {
notifier->callback();
}
}
return true;
}
return false;
}
這個Notifier中的回呼是在創建協程時通過RegisterNotifyCallback()函式注冊進去的,目的是為了喚醒相應的協程來處理該新訊息,
visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load())) {
return;
}
this->NotifyProcessor(task_id);
});
NotifyProcessor()函式會修改對應協程的狀態使之能被調度執行,前面提到,對于n個channel輸入的component,會有n+1個協程,它們都是以DataVisitor和訊息回呼函式一起作為引數創建的,這個協程主體中會呼叫DataVisitor::TryFetch()函式拿訊息,然后呼叫注冊的訊息處理函式:
factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg;
for (;;) {
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg)) {
f(msg);
CRoutine::Yield(RoutineState::READY);
} else {
CRoutine::Yield();
}
}
};
};
對于那n個訊息讀取協程來說,其訊息處理函式為:
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
這個回呼函式中會呼叫Reader::Enqueue()函式,在該函式中,主要呼叫Blocker::Publish()函式,它繼而呼叫Blocker::Enqueue()和Blocker::Notify()函式,Blocker類是一個存盤訊息的結構,BlockerManager類用于管理Blocker,其中維護了以channel為鍵值的Blocker的map,Reader::Enqueue()函式將訊息放到Blocker的成員published_msg_queue_佇列中,之后,可以通過Blocker::Observe()函式將成員published_msg_queue_佇列的訊息放到成員observed_msg_queue_佇列,然后通過Blocker::GetLatestObserved()函式得到最新的訊息,比如ControlComponent中的:
chassis_reader_->Observe();
const auto &chassis_msg = chassis_reader_->GetLatestObserved();
而對于剩下那一個協程,它是由主channel來觸發的,因它處理的是多channel的組合訊息,在協程主體中的TryFetch()函式會呼叫AllLatest::Fusion()函式同時拿多個channel上的最新訊息,至于這個組合訊息是怎么填入的前面有提,簡單來說,對于它來說,主channel來訊息時,同時也會將其它channel的訊息寫入組合訊息,然后調度協程,拿出組合訊息進行處理,其訊息處理函式為:
auto func = [self](const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1);
} else {
AERROR << "Component object has been destroyed.";
}
};
該實作中主要以收到的訊息為引數呼叫Component中的處理函式Process(),從而執行組件的自定義處理邏輯,
小結
文中提了不少細枝末節,最后非常high-level地概括下從寫者到讀者的流程,寫者Writer寫訊息時,會通過HybridTransmitter繼而使用合適后端的Transmitter發送訊息,根據讀與寫者間的位置關系,經過網路、共享記憶體或直接呼叫的方式,對應后端的Dispatcher收到訊息,收到后轉成指定訊息型別,交給Receiver,然后通過DataDispatcher派發訊息,派發訊息就是將訊息放到對應的buffer中,然后通知相應的協程來作進一步處理,上層模塊要取用這些訊息,主要兩種方式:一種是通過Component的Proc()介面,它被呼叫時引數就是最新的訊息,另一種是通過Reader的Observe()函式直接拿,

我們知道,Apollo在版本3.5前是基于ROS的,同時也對ROS做了幾個重要改進,這些改進不少是關于通信系統的,如共享記憶體、去中心化和資料兼容性,到Cyber RT的演進也自然延續了這幾個優點,總得來說,Cyber RT基于自動發現機制與Publish-Subscribe模式實作了通信網路的拓撲管理,同時它對資料傳輸層做了抽象,下面實作多個后端分別適用于不同場景,并提供了HYBRID模式可以根據讀者和寫者間的關系自動使用合適的傳輸層后端,這樣,通信系統的復雜性就被很好地屏蔽,框架就能提供給應用層便利的開發介面,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/34710.html
標籤:其他
