一直想搞一搞OpenDDS,最近終于能抽出空來研究一下了,可能是年齡大了,若不做點記錄過段時間腦子就清零了,趁熱打鐵,本文記錄了Windows10環境下OpenDDS環境搭建,編譯,idl自定義,代碼生成,對等發現,代碼撰寫的全程序,
1. 環境搭建與編譯
環境搭建的帖子非常多,這里不做贅述,我就貼一下我用到的鏈接:
https://blog.csdn.net/saint_ek/article/details/107869083?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242
編譯好后生成了兩個檔案夾“OpenDDS”,“ACE_wrappers”這兩個檔案夾就是本地的運行庫了,如果要給別的計算機部署,只需要把他們拷貝過去,再按照上面帖子設定好環境變數就可以直接使用了,
2. 生成Publisher和Subscriber相關代碼
2.1 代碼生成有兩種方式,可以通過IDL生成,也可以通過MPC生成,相關的文章很多,但我更傾向于用MPC來直接生成,這種方式比前一種更方便快捷,用MPC檔案生成代碼的前提是寫好IDL檔案,以下是自定義的一個Demo.idl檔案
module DemoIdlModule {
@topic
struct DemoTopic1 {
@key long id;
long counter;
string text;
};
};
上面的代碼中DemoIdlModule就類似于命名空間,里面可以有多個主題,每個主題用一個結構體來描述其資料結構,并冠以@topic,每個結構體里必須有一個類似資料庫表主鍵的欄位,以@key開頭,并且在同一個module里,該@key名字須是唯一的,
2.2 撰寫Demo.mpc檔案,將其與Demo.idl放置在一個檔案夾,Demo.mpc檔案內容如下:
project(*idl): dcps {
TypeSupport_Files {
Demo.idl
}
custom_only = 1
}
project(*publisher) : dcpsexe_with_tcp {
exename = publisher
after += *idl
TypeSupport_Files {
Demo.idl
}
Source_Files {
Publisher.cpp
}
}
project(*subscriber) : dcpsexe_with_tcp {
exename = subscriber
after += *publisher
TypeSupport_Files {
Demo.idl
}
Source_Files {
Subscriber.cpp
DataReaderListener.cpp
}
}
從以上MPC檔案來看,將來要生成一個publisher和subscriber工程,并且他們都基于資料結構定義檔案Demo.idl ,在生成代碼以前需要自己建立Publisher.cpp,Subscriber.cpp及DataReaderListener.h/DataReaderListener.cpp,否則后面生成代碼后是編譯不過的,
使用vs開發人員命令列工具,切到Demo.idl及Demo.mpc檔案所在目錄,根據ACE_wrappers所在路徑(此處以xxxx代替)和自己的vc版本(此處以vc14為例)輸入以下命令:
perl xxxx\ACE_wrappers\MPC\mwc.pl -type vc14
代碼生成完畢后,vs解決方案以及檔案夾生成的檔案如下:


右鍵單擊解決方案,“重新生成解決方案”發現編譯通過了,接下來就是如何去撰寫Publisher.cpp,Subscriber.cpp及DataReaderListener.h/DataReaderListener.cpp,另外要為工程撰寫一個組態檔,用于對OpenDDS的協議,功能進行配置,在這里我為publisher和subscriber分別建立了一個組態檔,便于各自部署,config_Pub.ini和config_Sub.ini內容暫時保持一致,如下:
[common]
DCPSGlobalTransportConfig=$file
DCPSDefaultDiscovery=DEFAULT_RTPS
[transport/the_rtps_transport]
transport_type=rtps_udp
以上組態檔方式為udp對等發現的方式,即不用先啟動.....\OpenDDS\bin\DCPSInfoRepo.exe
3. 撰寫代碼
3.1 Publisher.cpp代碼如下:
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/Marked_Default_Qos.h>
#include <dds/DCPS/PublisherImpl.h>
#include <dds/DCPS/transport/tcp/TcpInst.h>
#include "dds/DCPS/StaticIncludes.h"
#include <ace/streams.h>
#include "DemoTypeSupportImpl.h"
using namespace DemoIdlModule;
int ACE_TMAIN(int argc, ACE_TCHAR* argv[]) {
try {
// 初始化參與者
argv[1] = "-DCPSConfigFile";
argv[2] = "config_Pub.ini";
argc = 3;
// 1. 初始化參與者
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
DDS::DomainParticipant_var participant =
dpf->create_participant(111,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(participant.in())) {
cerr << "create_participant failed." << endl;
return 1;
}
// 2. 注冊資料型別
//這里是Topic而不是Topics,意義不同,體現在idl檔案里,
DemoTopic1TypeSupportImpl* servant = new DemoTopic1TypeSupportImpl();//這句是要根據XXXXTypeSupportImpl中的前綴與idl檔案中的Topic key名對應,在這里即"DemoTopic1"
OpenDDS::DCPS::LocalObject_var safe_servant = servant;
if (DDS::RETCODE_OK != servant->register_type(participant.in(), "")) {
cerr << "register_type failed." << endl;
exit(1);
}
// 3. 創建主題,這部分基本不用改
CORBA::String_var type_name = servant->get_type_name();
DDS::TopicQos topic_qos;
participant->get_default_topic_qos(topic_qos);
DDS::Topic_var topic =
participant->create_topic("Movie Discussion List",
type_name.in(),
topic_qos,
DDS::TopicListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(topic.in())) {
cerr << "create_topic failed." << endl;
exit(1);
}
// 4. 創建公布者,這部分基本不用改
DDS::Publisher_var pub =
participant->create_publisher(PUBLISHER_QOS_DEFAULT,
DDS::PublisherListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(pub.in())) {
cerr << "create_publisher failed." << endl;
exit(1);
}
// 5. 創建資料寫者
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);
DDS::DataWriter_var dw =
pub->create_datawriter(topic.in(),
dw_qos,
DDS::DataWriterListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(dw.in())) {
cerr << "create_datawriter failed." << endl;
exit(1);
}
DemoTopic1DataWriter_var message_dw //這句是要根據XXXXDataWriter_var,XXXXDataWriter中的前綴與idl檔案中的Topic key名對應,在這里即"DemoTopic1"
= DemoTopic1DataWriter::_narrow(dw.in());
//
// Get default Publisher QoS from a DomainParticipant:
DDS::PublisherQos pub_qos;
DDS::ReturnCode_t ret;
ret = participant->get_default_publisher_qos(pub_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default publisher QoS" << std::endl;
}
// Get default Subscriber QoS from a DomainParticipant:
DDS::SubscriberQos sub_qos;
ret = participant->get_default_subscriber_qos(sub_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default subscriber QoS" << std::endl;
}
// Get default Topic QoS from a DomainParticipant:
DDS::TopicQos topic_qos2;
ret = participant->get_default_topic_qos(topic_qos2);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default topic QoS" << std::endl;
}
// Get default DomainParticipant QoS from a DomainParticipantFactory:
DDS::DomainParticipantQos dp_qos;
ret = dpf->get_default_participant_qos(dp_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default participant QoS" << std::endl;
}
// Get default DataWriter QoS from a Publisher:
DDS::DataWriterQos dw_qos2;
ret = pub->get_default_datawriter_qos(dw_qos2);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default data writer QoS" << std::endl;
}
// 6. 公布資料
DemoTopic1 message;//這句是要根據idl檔案中的Topic key名對應,在這里即"DemoTopic1"
message.id = 99;
::DDS::InstanceHandle_t handle = message_dw->register_instance(message);
message.counter = 0;
char tMsg[50] = { 0 };
while (1)
{
message.counter++;
memset(tMsg, 0, 50);
sprintf(tMsg, "Msg Counter : %d", message.counter);
message.text = ::TAO::String_Manager(tMsg);
message_dw->write(message, handle);
ACE_OS::sleep(1);
cout << "..." << endl;
}
// 7. 物體清理
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();
}
catch (CORBA::Exception& e)
{
cerr << "PUB: Exception caught in main.cpp:" << endl
<< e << endl;
exit(1);
}
return 0;
}
3.2 Subscriber.cpp代碼如下:
#include "DemoTypeSupportImpl.h"
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/Marked_Default_Qos.h>
#include <dds/DCPS/PublisherImpl.h>
#include <dds/DCPS/transport/tcp/TcpInst.h>
#include "dds/DCPS/StaticIncludes.h"
#include <ace/streams.h>
#include "DataReaderListener.h"
using namespace DemoIdlModule;
int ACE_TMAIN(int argc, ACE_TCHAR* argv[]) {
try
{
// 初始化參與者
argv[1] = "-DCPSConfigFile";
argv[2] = "config_Sub.ini";
argc = 3;
// 1. 初始化參與者
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
DDS::DomainParticipant_var participant =
dpf->create_participant(111,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(participant.in())) {
cerr << "create_participant failed." << endl;
return 1;
}
// 2. 注冊資料型別并創建主題
DemoTopic1TypeSupportImpl* servant = new DemoTopic1TypeSupportImpl();
OpenDDS::DCPS::LocalObject_var safe_servant = servant;
if (DDS::RETCODE_OK != servant->register_type(participant.in(), "")) {
cerr << "register_type failed." << endl;
exit(1);
}
CORBA::String_var type_name = servant->get_type_name();
DDS::TopicQos topic_qos;
participant->get_default_topic_qos(topic_qos);
DDS::Topic_var topic =
participant->create_topic("Movie Discussion List",
type_name.in(),
topic_qos,
DDS::TopicListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(topic.in())) {
cerr << "create_topic failed." << endl;
exit(1);
}
// 3. 創建訂閱者
DDS::Subscriber_var sub =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(sub.in())) {
cerr << "Failed to create_subscriber." << endl;
exit(1);
}
// 4. 創建監聽者
DDS::DataReaderListener_var listener(new DataReaderListener);
DataReaderListener* listener_servant =
dynamic_cast<DataReaderListener*>(listener.in());
if (CORBA::is_nil(listener.in())) {
cerr << "listener is nil." << endl;
exit(1);
}
if (!listener_servant) {
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT("%N:%l main()")
ACE_TEXT(" ERROR: listener_servant is nil (dynamic_cast failed)!\n")), -1);
}
// 5. 創建資料讀者
DDS::DataReaderQos dr_qos;
sub->get_default_datareader_qos(dr_qos);
DDS::DataReader_var dr = sub->create_datareader(topic.in(),
dr_qos,
listener.in(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(dr.in())) {
cerr << "create_datareader failed." << endl;
exit(1);
}
//
// Get default Publisher QoS from a DomainParticipant:
DDS::PublisherQos pub_qos;
DDS::ReturnCode_t ret;
ret = participant->get_default_publisher_qos(pub_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default publisher QoS" << std::endl;
}
// Get default Subscriber QoS from a DomainParticipant:
DDS::SubscriberQos sub_qos;
ret = participant->get_default_subscriber_qos(sub_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default subscriber QoS" << std::endl;
}
// Get default Topic QoS from a DomainParticipant:
DDS::TopicQos topic_qos2;
ret = participant->get_default_topic_qos(topic_qos2);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default topic QoS" << std::endl;
}
// Get default DomainParticipant QoS from a DomainParticipantFactory:
DDS::DomainParticipantQos dp_qos;
ret = dpf->get_default_participant_qos(dp_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default participant QoS" << std::endl;
}
// Get default DataReader QoS from a Subscriber:
DDS::DataReaderQos dr_qos2;
ret = sub->get_default_datareader_qos(dr_qos2);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default data reader QoS" << std::endl;
}
while (1) {
ACE_OS::sleep(1);
}
// 6. 清理與OpenDDS相關聯的資源
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();
}
catch (CORBA::Exception& e)
{
cerr << "PUB: Exception caught in main.cpp:" << endl
<< e << endl;
exit(1);
}
return 0;
}
3.3 DataReaderListener.h 代碼如下:
// -*- C++ -*-
// 資料讀者監聽者實作
#ifndef DATAREADER_LISTENER_IMPL
#define DATAREADER_LISTENER_IMPL
#include <dds/DdsDcpsSubscriptionExtC.h>
#include <dds/DCPS/LocalObject.h>
#if !defined (ACE_LACKS_PRAGMA_ONCE)
#pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
class DataReaderListener
: public virtual OpenDDS::DCPS::LocalObject<OpenDDS::DCPS::DataReaderListener>
{
public:
DataReaderListener();
virtual ~DataReaderListener(void);
virtual void on_requested_deadline_missed(
DDS::DataReader_ptr reader,
const DDS::RequestedDeadlineMissedStatus& status);
virtual void on_requested_incompatible_qos(
DDS::DataReader_ptr reader,
const DDS::RequestedIncompatibleQosStatus& status);
virtual void on_liveliness_changed(
DDS::DataReader_ptr reader,
const DDS::LivelinessChangedStatus& status);
virtual void on_subscription_matched(
DDS::DataReader_ptr reader,
const DDS::SubscriptionMatchedStatus& status);
virtual void on_sample_rejected(
DDS::DataReader_ptr reader,
const DDS::SampleRejectedStatus& status);
virtual void on_data_available(
DDS::DataReader_ptr reader);
virtual void on_sample_lost(
DDS::DataReader_ptr reader,
const DDS::SampleLostStatus& status);
virtual void on_subscription_disconnected(
DDS::DataReader_ptr reader,
const ::OpenDDS::DCPS::SubscriptionDisconnectedStatus& status);
virtual void on_subscription_reconnected(
DDS::DataReader_ptr reader,
const ::OpenDDS::DCPS::SubscriptionReconnectedStatus& status);
virtual void on_subscription_lost(
DDS::DataReader_ptr reader,
const ::OpenDDS::DCPS::SubscriptionLostStatus& status);
virtual void on_budget_exceeded(
DDS::DataReader_ptr reader,
const ::OpenDDS::DCPS::BudgetExceededStatus& status);
long num_reads() const {
return num_reads_;
}
private:
DDS::DataReader_var reader_;
long num_reads_;
};
#endif /* DATAREADER_LISTENER_IMPL */
3.4 DataReaderListener.cpp 代碼如下:
// -*- C++ -*-
//
#include "DataReaderListener.h"
#include "DemoTypeSupportC.h"
#include "DemoTypeSupportImpl.h"
#include <dds/DCPS/Service_Participant.h>
#include <ace/streams.h>
using namespace DemoIdlModule;
DataReaderListener::DataReaderListener()
: num_reads_(0)
{
}
DataReaderListener::~DataReaderListener()
{
}
void DataReaderListener::on_data_available(DDS::DataReader_ptr reader)
{
++num_reads_;
try {
DemoTopic1DataReader_var message_dr = DemoTopic1DataReader::_narrow(reader);
if (CORBA::is_nil(message_dr.in())) {
cerr << "read: _narrow failed." << endl;
exit(1);
}
DemoTopic1 message;
DDS::SampleInfo si;
DDS::ReturnCode_t status = message_dr->take_next_sample(message, si);
if (status == DDS::RETCODE_OK) {
cout << "Message: id = " << message.id << endl
<< " DemoTopic1_Counter = " << message.counter << endl
<< " DemoTopic1_Text = " << message.text << endl;
cout << "SampleInfo.sample_rank = " << si.sample_rank << endl;
}
else if (status == DDS::RETCODE_NO_DATA) {
cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!" << endl;
}
else {
cerr << "ERROR: read Message: Error: " << status << endl;
}
}
catch (CORBA::Exception& e) {
cerr << "Exception caught in read:" << endl << e << endl;
exit(1);
}
}
void DataReaderListener::on_requested_deadline_missed(
DDS::DataReader_ptr,
const DDS::RequestedDeadlineMissedStatus&)
{
cerr << "DataReaderListener::on_requested_deadline_missed" << endl;
}
void DataReaderListener::on_requested_incompatible_qos(
DDS::DataReader_ptr,
const DDS::RequestedIncompatibleQosStatus&)
{
cerr << "DataReaderListener::on_requested_incompatible_qos" << endl;
}
void DataReaderListener::on_liveliness_changed(
DDS::DataReader_ptr,
const DDS::LivelinessChangedStatus&)
{
cerr << "DataReaderListener::on_liveliness_changed" << endl;
}
void DataReaderListener::on_subscription_matched(
DDS::DataReader_ptr,
const DDS::SubscriptionMatchedStatus&)
{
cerr << "DataReaderListener::on_subscription_matched" << endl;
}
void DataReaderListener::on_sample_rejected(
DDS::DataReader_ptr,
const DDS::SampleRejectedStatus&)
{
cerr << "DataReaderListener::on_sample_rejected" << endl;
}
void DataReaderListener::on_sample_lost(
DDS::DataReader_ptr,
const DDS::SampleLostStatus&)
{
cerr << "DataReaderListener::on_sample_lost" << endl;
}
void DataReaderListener::on_subscription_disconnected(
DDS::DataReader_ptr,
const ::OpenDDS::DCPS::SubscriptionDisconnectedStatus&)
{
cerr << "DataReaderListener::on_subscription_disconnected" << endl;
}
void DataReaderListener::on_subscription_reconnected(
DDS::DataReader_ptr,
const ::OpenDDS::DCPS::SubscriptionReconnectedStatus&)
{
cerr << "DataReaderListener::on_subscription_reconnected" << endl;
}
void DataReaderListener::on_subscription_lost(
DDS::DataReader_ptr,
const ::OpenDDS::DCPS::SubscriptionLostStatus&)
{
cerr << "DataReaderListener::on_subscription_lost" << endl;
}
void DataReaderListener::on_budget_exceeded(
DDS::DataReader_ptr,
const ::OpenDDS::DCPS::BudgetExceededStatus&)
{
cerr << "DataReaderListener::on_budget_exceeded" << endl;
}
3.5 編譯運行
編譯完成后,由于我們是采用對等發現的方式,因此不分先后得運行一個Publisher或Subscriber實體,會看到如下的運行結果,至此一個簡單的Demo就設計完了,可以把編譯好的“OpenDDS”,“ACE_wrapper”檔案夾拷貝到局域網里的其他電腦上,并給她按照1. 環境搭建與編譯中所述,設定環境變數,就可以實作局域網對等發現的Demo了,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/257158.html
標籤:區塊鏈
上一篇:位元幣合約怎么對沖風險?
