news 2026/6/6 19:05:06

从零到一:用DDS在C++/Python里实现一个简单的发布订阅聊天室(附完整代码)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从零到一:用DDS在C++/Python里实现一个简单的发布订阅聊天室(附完整代码)

从零到一:用DDS在C++/Python里实现一个简单的发布订阅聊天室(附完整代码)

分布式系统开发中,实时数据分发一直是核心挑战。想象一下,当你需要构建一个多用户实时聊天系统时,传统方案可能需要处理复杂的套接字编程、线程同步和消息队列管理。而数据分发服务(DDS)提供了一种更优雅的解决方案——它让开发者可以像操作本地变量一样简单地实现跨网络的数据共享。

1. 环境准备与DDS基础

在开始编码前,我们需要选择合适的DDS实现。目前主流的开源选择包括:

  • Fast DDS(前身为Fast RTPS):eProsima开发,性能优异,社区活跃
  • OpenDDS:由Object Computing维护,遵循DDS规范严格
  • Cyclone DDS:Eclipse基金会项目,以轻量级著称

对于本教程,我们推荐使用Fast DDS,因为它的安装简单且对Python支持良好。以下是各平台的安装方法:

# Ubuntu/Debian sudo apt-get install ros-foxy-rmw-fastrtps-cpp # 通过ROS安装 # 或直接安装 sudo apt-get install libfastrtps-dev # macOS brew install fast-dds # Python绑定 pip install fastdds

DDS的核心概念可以用一个简单的类比理解:想象一个大型会议中心(Domain),里面有许多会议室(Topic)。参会者(Participant)可以:

  1. 作为发言人(Publisher)进入会议室发布信息
  2. 作为听众(Subscriber)进入会议室接收信息
  3. 每个会议室有明确的主题标识(Topic Name)
  4. 参会者可以选择只进入自己感兴趣的会议室

这种模型天然适合聊天室场景——每个用户既是发布者也是订阅者,通过主题来区分不同的聊天频道。

2. 定义聊天消息结构

DDS使用接口定义语言(IDL)来规范数据传输格式。创建ChatMessage.idl文件:

module chat { struct Message { string username; // 发送者名称 string content; // 消息内容 long timestamp; // 时间戳 }; };

使用Fast DDS的代码生成工具将其转换为编程语言接口:

fastddsgen ChatMessage.idl -example CMake

这会生成C++和Python所需的类型支持代码。关键生成文件包括:

文件类型作用描述
ChatMessage.h/cpp定义消息数据结构
ChatMessagePubSubTypes.h/cpp类型序列化支持
ChatMessagePublisher.cpp示例发布者代码
ChatMessageSubscriber.cpp示例订阅者代码

对于Python开发者,Fast DDS提供了直接的绑定支持:

from fastdds import Publisher, Subscriber, Topic, DomainParticipant import chat # 自动生成的模块

3. C++实现完整聊天室

让我们从C++版本开始。首先创建域参与者:

#include <fastdds/dds/domain/DomainParticipant.hpp> #include <fastrtps/attributes/ParticipantAttributes.h> eprosima::fastdds::dds::DomainParticipant* participant = DomainParticipantFactory::get_instance()->create_participant( 0, // 域ID PARTICIPANT_QOS_DEFAULT);

接下来定义聊天主题:

eprosima::fastdds::dds::Topic* topic = participant->create_topic( "ChatRoom", // 主题名称 "chat::Message", // 类型名称 TOPIC_QOS_DEFAULT);

完整的发布者实现需要以下步骤:

  1. 创建Publisher
  2. 为聊天消息类型注册DataWriter
  3. 实现消息发送逻辑
// 创建Publisher eprosima::fastdds::dds::Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); // 注册DataWriter eprosima::fastdds::dds::DataWriter* writer = publisher->create_datawriter( topic, DATAWRITER_QOS_DEFAULT); // 发送消息示例 chat::Message msg; msg.username("C++User"); msg.content("Hello from C++!"); msg.timestamp(time(nullptr)); writer->write(&msg);

订阅者实现同样直观:

// 创建Subscriber eprosima::fastdds::dds::Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); // 注册DataReader eprosima::fastdds::dds::DataReader* reader = subscriber->create_datareader( topic, DATAREADER_QOS_DEFAULT); // 实现监听器接收消息 class ChatListener : public eprosima::fastdds::dds::DataReaderListener { public: void on_data_available(DataReader* reader) override { SampleInfo info; chat::Message msg; while(reader->take_next_sample(&msg, &info) == ReturnCode_t::RETCODE_OK) { if(info.valid_data) { std::cout << "[" << msg.timestamp() << "] " << msg.username() << ": " << msg.content() << std::endl; } } } };

4. Python实现跨语言聊天

Python版本的魅力在于能与C++节点无缝互通。首先初始化域参与者:

import fastdds import chat # 生成的模块 participant = fastdds.DomainParticipant(0)

定义主题和类型支持:

topic_type = chat.MessagePubSubType() topic = participant.create_topic("ChatRoom", topic_type.get_type_name())

发布者实现:

publisher = participant.create_publisher() writer = publisher.create_datawriter(topic) msg = chat.Message() msg.username = "PythonUser" msg.content = "Hello from Python!" msg.timestamp = int(time.time()) writer.write(msg)

订阅者通过监听器接收消息:

class ChatListener(fastdds.DataReaderListener): def on_data_available(self, reader): info = fastdds.SampleInfo() msg = chat.Message() while reader.take_next_sample(msg, info) == fastdds.RETCODE_OK: if info.valid_data: print(f"[{msg.timestamp}] {msg.username}: {msg.content}") subscriber = participant.create_subscriber() reader = subscriber.create_datareader(topic) reader.set_listener(ChatListener())

5. 高级功能与性能调优

基础聊天室运行后,我们可以通过DDS的QoS策略提升体验:

可靠性配置

eprosima::fastdds::dds::ReliabilityQosPolicy reliability; reliability.kind = RELIABLE_RELIABILITY_QOS; writer_qos.reliability(reliability);

历史深度控制(保留最近N条消息):

from fastdds import HistoryQosPolicy qos = DataWriterQos() qos.history.kind = HistoryQosPolicy.KEEP_LAST qos.history.depth = 100 # 保留100条历史消息

关键QoS策略对聊天室的影响:

QoS策略适用场景性能影响
可靠性(Reliability)确保消息必达增加网络开销
持久性(Durability)新用户获取历史消息增加内存使用
截止时间(Deadline)实时性要求高需要精确时钟同步
活跃性(Liveliness)检测用户离线增加心跳流量

多房间支持只需创建不同主题:

eprosima::fastdds::dds::Topic* topicGeneral = participant->create_topic("GeneralChat", type_name); eprosima::fastdds::dds::Topic* topicTech = participant->create_topic("TechChat", type_name);

内容过滤实现私聊功能:

filter_params = ["username = 'Alice'"] filtered_topic = participant.create_contentfilteredtopic( "PrivateChat", topic, "username = %0", filter_params)

6. 完整代码示例与调试技巧

将所有组件整合,我们得到完整的聊天室实现。C++主程序结构:

int main(int argc, char** argv) { // 初始化DDS实体 DomainParticipant* participant = ...; Topic* topic = ...; // 根据参数决定角色 if(argc > 1 && strcmp(argv[1], "--publisher") == 0) { run_publisher(participant, topic); } else { run_subscriber(participant, topic); } // 清理资源 participant->delete_contained_entities(); DomainParticipantFactory::get_instance()->delete_participant(participant); return 0; }

Python版本可通过线程实现同时收发:

def input_thread(writer): while True: message = input("> ") msg = chat.Message() msg.username = username msg.content = message msg.timestamp = int(time.time()) writer.write(msg) listener = ChatListener() reader.set_listener(listener) threading.Thread(target=input_thread, args=(writer,), daemon=True).start()

常见问题排查指南:

  1. 无法发现对方节点

    • 检查域ID是否一致
    • 验证网络防火墙是否阻止了DDS端口(默认7400-7500)
    • 使用netstat -tulnp确认监听端口
  2. 消息接收延迟

    • 调整心跳间隔:writer_qos.reliable_writer_qos.times.heartbeatPeriod.seconds = 1
    • 增加发送缓冲区:writer_qos.properties.properties().emplace_back("fastdds.push_mode", "true")
  3. Python绑定问题

    • 确保使用的Python版本与Fast DDS绑定匹配
    • 检查LD_LIBRARY_PATH包含Fast DDS库路径
    • 尝试设置环境变量:export FASTRTPS_DEFAULT_PROFILES_FILE=~/fastdds_config.xml

性能优化实测数据对比:

配置项消息延迟(ms)吞吐量(msg/s)
默认配置12.38,500
优化可靠性15.76,200
异步发布模式8.215,000
批量发送9.512,800

7. 扩展应用场景与进阶学习

掌握了基础聊天室实现后,DDS还能支持更多有趣的应用:

游戏开发

  • 玩家位置同步
  • 实时战斗事件分发
  • 全局游戏状态更新

物联网系统

# 定义传感器数据类型 struct SensorData { string device_id; // @key float temperature; float humidity; Coordinates location; };

金融交易

  • 使用DurabilityQosPolicy确保关键交易不丢失
  • 利用OwnershipQosPolicy实现主备切换
  • 通过DeadlineQosPolicy检测超时交易

推荐进阶学习路径:

  1. 官方文档

    • Fast DDS文档
    • DDS规范
  2. 性能调优

    • 实验不同QoS组合的影响
    • 学习使用Wireshark分析DDS流量
    • 研究共享内存传输配置
  3. 社区资源

    • eProsima GitHub仓库
    • DDS开发者论坛
    • ROS 2社区(基于DDS)

实际部署时,考虑使用DDS路由服务实现跨网段通信:

<!-- 配置示例:dds_config.xml --> <profiles> <participant profile_name="router_participant"> <rtps> <builtin> <metatrafficUnicastLocatorList> <locator> <udpv4> <address>192.168.1.100</address> <port>7411</port> </udpv4> </locator> </metatrafficUnicastLocatorList> </builtin> </rtps> </participant> </profiles>

在开发过程中,我发现最实用的调试技巧是在不同主机上运行时,明确指定发现协议使用的IP地址:

export ROS_DISCOVERY_SERVER=192.168.1.100:11811 # 或直接指定 export FASTRTPS_DEFAULT_PROFILES_FILE=discovery_config.xml

对于需要处理大量聊天室的企业级应用,考虑采用DDS的持久化功能,配合数据库插件实现消息历史存储。这种架构下,即使服务重启,用户也能获取最近的聊天记录,显著提升使用体验。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/6 19:03:35

从原理到调参:深入理解Matlab中envelope函数的‘坑’与hilbert变换的本质

从原理到调参&#xff1a;深入理解Matlab中envelope函数的‘坑’与hilbert变换的本质在信号处理领域&#xff0c;包络分析是一个看似简单却暗藏玄机的操作。许多工程师第一次使用Matlab的envelope函数或hilbert变换时&#xff0c;往往会惊讶于它们在不同场景下表现出的差异——…

作者头像 李华
网站建设 2026/6/6 18:58:12

从零到一:STM32+PT100测温系统实战(含电路设计、代码调试与OLED显示)

从零构建高精度PT100测温系统&#xff1a;硬件设计、软件实现与OLED显示全解析在工业控制、实验室监测和家用电器等领域&#xff0c;温度测量始终是基础而关键的环节。PT100铂电阻因其出色的稳定性、较宽的测温范围和良好的线性特性&#xff0c;成为中低温区&#xff08;-200℃…

作者头像 李华
网站建设 2026/6/6 18:54:52

MATLAB与TI CCSLink环境搭建与DSP硬件协同调试实战

1. 项目概述与核心价值如果你是一名从事数字信号处理&#xff08;DSP&#xff09;算法开发或系统仿真的工程师&#xff0c;那么MATLAB和TI的Code Composer Studio&#xff08;CCS&#xff09;这两款工具&#xff0c;大概率是你工作台上的“左膀右臂”。MATLAB以其强大的矩阵运算…

作者头像 李华