OpenDDS开发指南(OPenDDS程序)

一、介绍

OpenDDS是一个开源的、C++语言编写的分布式数据交换中间件框架,使用可扩展的、面向对象的模型实现了数据通信。它支持多种传输协议(包括TCP/IP、UDP、Shared Memory等),可以在不同的操作系统和开发语言之间进行分布式通信。

OpenDDS采用了Data Distribution Service (DDS)规范,由Object Management Group (OMG)维护,是一种面向数据的、高性能、可靠的消息传递中间件。

OpenDDS的重点是处理高吞吐量、低延迟的数据传输。它提供了灵活的QoS机制(Quality of Service,服务质量),使得开发者可以根据需求调整数据传输的优先级、可靠性和延迟等参数。

二、安装

OpenDDS的安装分为以下几个步骤:

1. 下载

首先需要从OpenDDS官方网站上下载源代码:http://opendds.org/downloads.html

2. 安装依赖

在安装OpenDDS之前,需要确保系统中安装了以下依赖库:


sudo apt-get install libboost-all-dev libssl-dev libace-dev

3. 编译安装

接下来进入源代码的根目录,执行以下命令进行编译和安装:


./configure --prefix=/usr/local/opendds
make -j4
sudo make install

三、基本概念

在使用OpenDDS开发分布式应用程序之前,需要了解以下基本概念:

1. Topic

Topic是指一种数据类型的集合,可以理解为一个“主题”,可以被发布者发布和订阅者订阅。每个Topic都有一个唯一的名称和类型。

2. Publisher和Subscriber

Publisher是数据发布者,用于将消息发送给订阅者,而Subscriber则是数据订阅者,用于接收发布者发送的消息。一个Publisher或Subscriber可以订阅或发布多个Topic。

3. DataWriter和DataReader

DataWriter是发布者中的数据写入器,用于将消息发送到Topic中。DataReader是订阅者中的数据读取器,用于从Topic中读取消息。一个DataWriter或DataReader只能发布或订阅一个Topic。

4. Domain

Domain是指OpenDDS中的一个消息域,可以理解为消息传输的一个独立区域。每个Domain都有一个唯一的标识符(Domain ID),用于标识消息传输中的不同域。一个域中可以包含多个Publisher和Subscriber,它们可以通过共享同一个Topic进行通信。

四、使用指南

以下是一个简单的OpenDDS程序:


#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/PublisherImpl.h>
#include <dds/DCPS/SubscriberImpl.h>
#include <dds/DCPS/WaitSet.h>
#include <dds/DCPS/transport/framework/TheTransportFactory.h>
#include <dds/DCPS/transport/framework/TransportDefs.h>
#include <dds/DCPS/transport/tcp/TcpInst.h>

#include "ExampleTypeSupportImpl.h"

int ACE_TMAIN(int argc, ACE_TCHAR* argv[]) {
    // 初始化DDS
    DDS::DomainParticipantFactory_var dpf =
            TheParticipantFactoryWithArgs(argc, argv);

    // 创建一个Domain
    DDS::DomainParticipant_var participant = dpf->create_participant(
            42,
            PARTICIPANT_QOS_DEFAULT,
            DDS::DomainParticipantListener::_nil(),
            ::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
    if (CORBA::is_nil(participant.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create domain participantn"), -1);
    }

    // 注册数据类型
    Example::ExampleTypeSupport_var ts = new Example::ExampleTypeSupportImpl();
    if (ts->register_type(participant.in(), "") != DDS::RETCODE_OK) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to register the Example typen"), -1);
    }

    // 创建Publisher和Subscriber
    DDS::Publisher_var pub = participant->create_publisher(PUBLISHER_QOS_DEFAULT,
                                                           DDS::PublisherListener::_nil(),
                                                           ::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
    if (CORBA::is_nil(pub.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create publishern"), -1);
    }

    DDS::Subscriber_var sub = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
                                                             DDS::SubscriberListener::_nil(),
                                                             ::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
    if (CORBA::is_nil(sub.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create subscribern"), -1);
    }

    // 创建Topic
    DDS::Topic_var topic = participant->create_topic("ExampleTopic",
                                                      ts->get_type_name(),
                                                      TOPIC_QOS_DEFAULT,
                                                      DDS::TopicListener::_nil(),
                                                      ::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
    if (CORBA::is_nil(topic.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create topicn"), -1);
    }

    // 创建DataWriter和DataReader
    DDS::DataWriter_var dw = pub->create_datawriter(topic.in(),
                                                    DATAWRITER_QOS_DEFAULT,
                                                    DDS::DataWriterListener::_nil(),
                                                    ::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
    if (CORBA::is_nil(dw.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create data writern"), -1);
    }

    DDS::DataReader_var dr = sub->create_datareader(topic.in(),
                                                    DATAREADER_QOS_DEFAULT,
                                                    DDS::DataReaderListener::_nil(),
                                                    ::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
    if (CORBA::is_nil(dr.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create data readern"), -1);
    }

    Example::ExampleDataWriter_var example_dw = Example::ExampleDataWriter::_narrow(dw.in());
    if (CORBA::is_nil(example_dw.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to narrow data writern"), -1);
    }

    Example::ExampleDataReader_var example_dr = Example::ExampleDataReader::_narrow(dr.in());
    if (CORBA::is_nil(example_dr.in())) {
        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to narrow data readern"), -1);
    }

    // 发布数据
    Example::Example data;
    data.key = 1;
    data.value = "Hello, world!";
    example_dw->write(data, DDS::HANDLE_NIL);

    // 等待接收数据
    DDS::DataReaderSeq readers;
    DDS::SampleInfoSeq infos;
    DDS::ReturnCode_t result;
    do {
        // 等待可读事件
        DDS::WaitSet_var ws(new DDS::WaitSet);
        ws->attach_condition(dr->create_readcondition(DDS::NOT_READ_SAMPLE_STATE,
                                                       DDS::NEW_VIEW_STATE,
                                                       DDS::ANY_INSTANCE_STATE));
        result = sub->get_readers(readers, infos, DDS::LENGTH_UNLIMITED,
                                  DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
        if (result != DDS::RETCODE_OK && result != DDS::RETCODE_NO_DATA) {
            ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to get readersn"), -1);
        }
        ACE_DEBUG((LM_INFO, "readers.length() = %dn", readers.length()));

        // 接收数据
        int count = 0;
        for (unsigned int i = 0; i < readers.length(); ++i) {
            Example::ExampleDataReader_var dr = Example::ExampleDataReader::_narrow(readers[i]);
            if (CORBA::is_nil(dr.in())) {
                ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to narrow data readern"), -1);
            }
            for (unsigned int j = 0; j take_next_sample(data, infos[j]);
                    if (result == DDS::RETCODE_OK) {
                        ACE_DEBUG((LM_INFO, "Received data: key = %d, value = %Cn",
                                   data.key, data.value.in()));
                        ++count;
                    } else if (result == DDS::RETCODE_NO_DATA) {
                        continue;
                    } else {
                        ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to take next samplen"), -1);
                    }
                }
            }
        }
        DDS::ConditionSeq active_conditions;
        if (ws->wait(active_conditions, DDS::DURATION_INFINITE) != DDS::RETCODE_OK) {
            ACE_ERROR_RETURN((LM_ERROR, "ERROR: Wait failedn"), -1);
        }
        ws->detach_condition(active_conditions[0]);
    } while (result == DDS::RETCODE_NO_DATA);

    // 销毁资源
    dpf->delete_participant(participant.in());
    TheTransportFactory->release();
    ::DDS::StringSeq_var strings = dpf->get_log_lines();
    for (CORBA::ULong i = 0; i length(); ++i) {
        ACE_DEBUG((LM_DEBUG, "%Cn", strings[i].in()));
    }

    return 0;
}

该程序创建了一个Domain,并注册了一个名为“Example”的数据类型。接下来创建了一个Publisher和一个Subscriber,并创建了一个名为“ExampleTopic”的Topic。最后,程序向Topic中发布了一条数据,再从Topic中接收数据并输出。

五、总结

OpenDDS是一个开源的、高性能的消息传递中间件框架,支持多种传输协议和QoS机制。使用OpenDDS可以方便地实现分布式应用程序中的数据交换和通讯,具有广泛的应用前景。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平