0%

DDS 协议介绍

  • DDS是一种为实时系统设计的发布-订阅通信协议,其特点是去中心化、支持多种QoS机制和实时通信,和SOME/IP类似但不完全相同。
  • RTI Connext DDS和Fast DDS(以前称为Fast RTPS)都是实现了DDS规范的中间件,本文是对DDS相关概念的介绍,以及对RTI Connext DDS和Fast DDS的一些基本对比。
Data Distribution Service

DDS介绍

DDS(Data Distribution Service)是由OMG(Object Management Group)发布的分布式通信规范。

OMG成立于1989年,是一个国际性、开放性、非营利性的技术标准联盟,由供应商、终端用户、学术机构和政府机构推动。OMG工作组致力于制定企业集成标准和开发可为数千个垂直行业提供现实价值的技术标准,其中包括统一建模语言SYSML、UML,以及中间件标准CORBA、DDS等。

DDS最早应用于美国海军系统,用于解决在军舰系统复杂网络环境中进行大量软件升级时的兼容性问题。随着DDS被ROS2和AUTOSAR引入,目前它已经广泛应用于航空、航天、船舶、国防、金融、通信、汽车等领域。

目前,DDS已被多个车载中间件平台引入。AUTOSAR AP已完整地集成了DDS标准的网络绑定。另外,虽然AUTOSAR CP的标准规范本身不支持DDS,但通过一些变通方法也可以在CP上集成DDS。ROS2和CyberRT的底层都使用了开源的DDS作为最重要的通信机制。

DDS特点

数据中心(Data Centricity)

DDS最重要的特性是以数据为中心,这与其他许多通信中间件不同。DDS的数据共享以Topic为单元,应用程序能够通过Topic判断包含的数据类型,而不必依赖其他上下文信息。同时,DDS能够按照用户定义的方式自动地存储、发布或订阅数据,使应用程序能够像访问本地数据一样进行数据的写入或读取。

全局数据空间(Global Data space)

DDS实现的数据共享可以被理解为一个抽象的全局数据空间,无论应用程序是用哪种开发语言编写,或者在哪种操作系统上运行,都可以以相同的方式访问这个全局数据空间,就像访问本地存储空间一样。当然,全局数据空间只是一个抽象概念,在实际实现中,数据仍然被分别存储在每个应用程序的本地空间中。在系统运行时,数据是按需传输或存储的,数据的发布者只发送订阅者需要的数据,而订阅者只接收并存储本地应用程序当前所需的数据。

服务质量(Quality of service)

DDS还提供了高度灵活的QoS(Quality of Service)策略,以满足用户对数据共享方式的不同需求,例如可靠性和故障处理等。对于对数据安全性要求较高的系统,DDS还提供了精细的数据安全控制,包括应用程序身份认证、权限控制和数据加密等。

动态发现(Dynamic Discovery)

类似于SOME/IP-SD,DDS提供了数据发布者和订阅者的动态发现机制,这意味着用户无需手动配置通信节点的地址或其他属性信息,因为它们在运行过程中会自动发现对方并自动完成相关配置,实现了即插即用的功能。

可扩展架构(Scalable Architecture)

DDS可应用于边缘计算、云计算领域。在边缘计算中,DDS可以实现高速实时的设备间通信。在中间系统中,DDS提供健壮可靠的QoS和内容感知的信息流。DDS提供可扩展的信息访问和数据分发手段,用于集成信息系统,将各系统接入云端。
OMG DDS的适用范围广泛,涵盖了从小型设备到云计算系统等超大型系统。DDS能够以超高速传输数据并同时管理数千个数据对象,提供极高的可用性和安全性,非常适用于物联网。通过提供一个标准的通信层,DDS屏蔽了底层复杂性,简化了分布式系统的开发。

安全(Security)

DDS为关键任务的工业物联网环境提供了全面的安全保护机制,跨系统、跨供应商,覆盖从边缘设备到云端的安全性需求。
DDS提供了身份验证、访问控制、数据加密和数据完整性等安全机制,以确保数据分发的安全性。这些安全机制是在点对点对等架构上实现的,不会影响实时通信的性能。

DDS与SOME/IP区别

SOME/IP和DDS是目前在域控最常用的两类通信中间件,它们都是面向服务的通信协议,并采用以数据为中心的发布/订阅模式。
然而,SOME/IP和DDS在许多方面也存在差异,主要区别如下:

主要应用领域不同

SOME/IP专为汽车领域开发,针对汽车领域的需求定义了一套通信标准,并在汽车领域深耕已久;而DDS是一个工业级别的强实时通信标准,适应性较强,但在应用于汽车领域时需要进行专门的裁剪。

灵活性和可伸缩性不同

相比SOME/IP,DDS引入了许多标准内置特性,如基于内容和时间的过滤、与传输无关的可靠性、持久性、存活性、延迟/截止时间监视、可扩展类型等。当AUTOSAR AP与DDS一起构建通信框架时,该框架不仅与现有API和应用程序兼容,还在可靠性、性能、灵活性和可伸缩性等方面提供重要的好处。

订阅方和发布方的耦合程度不同 ⭐

在SOME/IP中,订阅方在正常数据传输之前需要与发布方建立网络连接并询问发布方是否提供所需服务,即Service Discovery,从这个角度看,节点之间仍然存在一定的耦合性。而在DDS标准下每个订阅方或发布方只需要在自己的程序中订阅或发布数据,无需关心任何连接。因此,在DDS中,服务的订阅方和发布方更加彻底地解耦。

服务策略不同

良好的服务质量(QoS)是DDS标准相对于SOME/IP最重要的特征。SOME/IP只有一个QoS,而RTI DDS和开源DDS分别提供了50多个和20多个QoS,这些QoS基本上涵盖了大多数可预见的业务场景。

应用场景不同

从应用场景的角度来看,SOME/IP更适用于车载网络,并且只能在基于IP类型的网络环境中使用;而DDS在传输方式上没有特别的限制,可以支持基于非IP类型的网络,例如共享内存、跨核通信、PCIe等。此外,DDS还提供了完备的车联网解决方案,其独有的DDS Security和DDS Web功能为用户提供一站式的“车-云-移动端”解决方案。

方案对比

概述

商业方案

  1. RTI - Connext DDS
    RTI作为OMG组织董事会的成员,领导了DDS标准的制定,并开发了名为Connext的DDS品牌,因此也被称为Connext DDS,是业界领先的DDS供应商。

  2. Twin Oaks Computing, Inc. - CoreDX DDS
    Twin Oaks Computing, Inc. 是一家致力于开发和提供优质软件解决方案的公司,其开发的CoreDX DDS是领先的DDS中间件,主要特点是体积小,速度快,适用于小型和嵌入式系统。

  3. Remedy IT
    Remedy IT是一家致力于开放标准的技术服务/咨询/培训公司,为航空航天、国防、金融、电信和能源等不同领域的各种组织提供DDS软件解决方案和支持。

开源方案

开源DDS相对于商用的RTI Connext DDS等来说,也是根据OMG官方标准开发的,但源代码是开放的,主要包括Fast DDS和Open DDS等。

  1. eProsima Fast DDS
    在自动驾驶领域,由RTI原核心团队成员在欧洲创办的eProsima公司推出了影响力较大的开源DDS,名为Fast DDS。在eProsima将Fast DDS的源代码开放后,用户可以直接在GitHub上免费下载使用。

  2. Object Computing OpenDDS
    Open DDS由位于圣路易斯和凤凰城的Object Computing的ACE/TAO团队开发,与Fast DDS有一定的相似性,两者都基于RTPS实现,都是面向数据的通信框架,并遵循同一标准。
    上述两类开源方案都支持去中心化、支持QoS机制和实时通信等DDS标准。

尽管开源DDS对RTI的商用DDS形成一定的竞争,但开源DDS也存在一些不足之处:开源DDS的使用门槛较高,使用起来没有商业方案方便;开源DDS服务策略完整性远不及商业版本,RTI DDS的服务策略有50多个,而开源DDS只有23个;RTI的DDS已通过了ASIL-D认证,而开源DDS尚未达到这一认证水平。

Rti Connext DDS Fast DDS
供应商 Real-Time Innovations eProsima
开源与否 不开源,需要购买商业许可 开源
性能 在数据吞吐量和延迟方面更优 在内存使用和CPU使用方面更优
特性 提供了一些高级特性,如数据查询和持久化 支持更多的平台和操作系统
平台支持 RTI Connext DDS 支持平台 Fast DDS 支持平台
社区支持 庞大的用户社区和丰富的文档 较弱
安全性 高,通过了ASIL-D认证 较弱
Qos服务策略 多,50+ 较少,20+
使用门槛 使用门槛较低,接口、编译均可自动化处理 使用门槛较高,接口封装、编译等均须手动配置
便捷性 提供丰富的工具和服务,使用较为便捷 内置工具和服务较少,使用便捷性较弱

RTI CONNXT DDS

官方链接

核心接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Start communicating in a domain, usually one participant per application
dds::domain::DomainParticipant participant(domain_id);

// Create a Topic with a name and a datatype
dds::topic::Topic< ::OTACtrl_struct> topic(participant, "Example OTACtrl_struct");

// Create a Publisher
dds::pub::Publisher publisher(participant);

// Create a DataWriter with default QoS
dds::pub::DataWriter< ::OTACtrl_struct> writer(publisher, topic);

// Set the values
::OTACtrl_struct data;
data.ota_hmi_req(OTA_HMI_Req::SetAppointmentTime);
data.appointment_time(std::to_string(1704962170593));

// write data
writer.write(data);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Start communicating in a domain, usually one participant per application
dds::domain::DomainParticipant participant(domain_id);

// Create a Topic with a name and a datatype
dds::topic::Topic< ::OTACtrl_struct> topic(participant, "Example OTACtrl_struct");

// Create a Subscriber and DataReader with default Qos
dds::sub::Subscriber subscriber(participant);
dds::sub::DataReader< ::OTACtrl_struct> reader(subscriber, topic);

// Create a ReadCondition for any data received on this reader and set a
// handler to process the data
unsigned int samples_read = 0;
dds::sub::cond::ReadCondition read_condition(
reader,
dds::sub::status::DataState::any(),
[reader, &samples_read]() { samples_read += process_data(reader); });

// WaitSet will be woken when the attached condition is triggered
dds::core::cond::WaitSet waitset;
waitset += read_condition;

while (!application::shutdown_requested && samples_read < sample_count) {
std::cout << "::OTACtrl_struct subscriber sleeping up to 1 sec..." << std::endl;

// Run the handlers of the active conditions. Wait for up to 1 second.
waitset.dispatch(dds::core::Duration(1));
}

内置工具

  • rtiadminconsole: 用于监视分析和配置正在运行的DDS系统的工具。
  • rticlouddiscoveryservice: 用于在云环境中发现DDS服务的工具。
  • rticonverter: 用于转换DDS数据的工具。
  • rtiddsgen: 用于从IDL文件生成DDS数据类型的工具。
  • rtiddsgen_server: rtiddsgen的服务器版本。
  • rtiddsping: 用于测试DDS网络连接的工具。
  • rtiddsspy: 用于监听DDS网络上的数据的工具。
  • rtilauncher: 一个图形界面工具,用于启动其他RTI工具。
  • rtimonitor: 用于监视DDS系统的工具。
  • rtipersistenceservice: 用于在DDS系统中提供数据持久化的服务。
  • rtipkginstall: 用于安装RTI软件包的工具。
  • rtipssh: 用于在DDS系统中提供安全的shell服务。
  • rtirecordingindexer: 用于索引DDS数据记录的工具。
  • rtirecordingservice: 用于记录DDS数据的服务。
  • rtirecordingservice_list_tags: 用于列出DDS数据记录标签的工具。
  • rtireplayservice: 用于回放DDS数据记录的服务。
  • rtiroutingservice: 用于在DDS系统中提供数据路由的服务。
  • rtirssh: 用于在DDS系统中提供安全的远程shell服务。
  • rtishapesdemo: 一个用于演示DDS功能的图形界面工具。
  • rtisystemdesigner: 一个用于设计DDS系统的图形界面工具。
  • rtiwebintegrationservice: 用于将DDS数据集成到Web应用的服务。
  • rtixmlconverter: 用于转换DDS XML配置文件的工具。

支持平台

Platform Support

eProsima Fast DDS

官方链接

核心接口

DomainParticipant:充当所有其他实体对象的容器,并充当发布者、订阅者和主题对象的工厂。
Publisher:是负责创建 DataWriter的对象。
DataWriter:允许应用程序设置要在给定主题下发布的数据的值。
TypeSupport:为participant提供序列化、反序列化和获取特定数据类型的key的函数。
DataWriterListener:允许重新定义 DataWriterListener 的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建participant
DomainParticipantQos participantQos;
participantQos.name("Participant_publisher");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);

// register数据类型并创建topic
type_ = new HelloWorldPubSubType()
type_.register_type(participant_);
topic_ = participant_->create_topic("HelloWorldTopicName", "HelloWorld", TOPIC_QOS_DEFAULT);

// 创建publisher
publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr);

// 创建writer
writer_ = publisher_->create_datawriter(topic_, DATAWRITER_QOS_DEFAULT, &listener_);

// 使用Publisher对象发送数据
bool publish()
{
hello_.index(hello_.index() + 1);
if (writer_->write(&hello_))
return true;
return false;
}

DomainParticipant:充当所有其他实体对象的容器,并充当发布者、订阅者和主题对象的工厂。
Subscriber:是负责创建 DataReader 的对象。
DataReader:它是负责实际接收数据的对象。
TypeSupport:为participant提供序列化、反序列化和获取特定数据类型的key的函数。
DataReaderListener:这是分配给数据读取器的侦听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 创建participant
DomainParticipantQos participantQos;
participantQos.name("Participant_subscriber");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);

// register数据类型并创建topic
type_ = new HelloWorldPubSubType()
type_.register_type(participant_);
topic_ = participant_->create_topic("HelloWorldTopicName", "HelloWorld", TOPIC_QOS_DEFAULT);

// 创建subscriber
subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);

// 创建reader
reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &listener_);

// 使用Subscriber对象接收数据
void on_data_available(DataReader* reader) override //当datareader接收到datawriter发送的新数据时,会触发该回调函数。
{
SampleInfo info;
if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK){
if (info.valid_data){
samples_++;
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()<< " RECEIVED." << std::endl;
}
}
}

支持平台

Supported Platform

术语定义

术语 定义
DDS Data Distribution Service,数据分发服务,提供了一种数据中心的通信方式,提供丰富的服务质量策略,允许数据的发布者和订阅者在时间和空间上解耦,从而保障数据进行实时且高效的分发。
ROS2 Robot Operating System 2,是一个用于编写机器人软件的框架,ROS 2选择了DDS作为其底层的消息传递中间件,这意味着ROS 2的节点(即ROS中的进程)均使用DDS来发布或订阅主题,以及进行服务调用。
OMG Object Management Group,对象管理组织,是一个国际性的、开放的、非盈利的标准化组织,致力于建立对程序、系统和业务流程的建模标准,它制定了DDS的中间件协议和API标准。
AUTOSAR Automotive Open System Architecture,是一个全球的汽车工业联盟,它定义了一个开放的、标准的汽车软件架构。AUTOSAR规范自R22-11开始引入了DDS内容,去年发布的R23-11标准对于DDS有更加具体的描述,能够作为模块开发的一个参考。
QoS Quality of Service,是一个广泛用于网络通信中的概念,用于描述网络的性能水平,包括数据传输的错误率、带宽、延迟、抖动等。

参考链接