news 2026/5/21 16:20:30

ROS2实战:用Action实现一个带进度条的机械臂抓取任务(Python/C++双版本)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ROS2实战:用Action实现一个带进度条的机械臂抓取任务(Python/C++双版本)

ROS2实战:用Action实现一个带进度条的机械臂抓取任务(Python/C++双版本)

机械臂抓取是工业自动化中的经典场景,但如何优雅地处理这个可能耗时数秒的多步骤任务?ROS2的Action机制给出了完美解决方案。不同于简单的服务调用,Action允许任务执行过程中持续反馈状态,特别适合需要实时监控的抓取流程——从物体识别到运动规划,再到夹爪控制,每个阶段都能通过进度条直观展示。

本文将带你用ROS2的Action实现一个完整的机械臂抓取任务,包含Python和C++两种实现。我们会重点设计反馈消息结构,让客户端不仅能知道任务是否完成,还能实时获取当前进度百分比和具体执行阶段。以下是最终效果预览:

# 客户端收到的反馈示例 [GRIPPER_ACTION] Current: 45% - Moving to target position [GRIPPER_ACTION] Current: 80% - Gripping object

1. ROS2 Action核心概念解析

在深入代码前,我们需要明确几个关键概念。Action是ROS2中用于长时间运行任务的通信机制,由三个部分组成:

  • Goal:客户端发起请求时发送的目标描述(如抓取目标位置)
  • Feedback:服务端执行过程中定期发送的进度更新
  • Result:任务完成后返回的最终结果

与ROS1相比,ROS2的Action实现有几个显著改进:

特性ROS1ROS2
依赖库依赖boost::bind使用C++标准库(std::bind)
线程模型单线程回调支持多线程执行器
接口定义.action文件.action文件(语法兼容)
取消机制需要手动处理内置更完善的取消支持

对于机械臂抓取任务,典型的执行流程可能包含以下阶段:

  1. 目标检测(20%):识别待抓取物体的位姿
  2. 路径规划(30%):计算机械臂运动轨迹
  3. 运动执行(40%):控制机械臂移动到目标位置
  4. 夹爪操作(10%):执行抓取动作

接下来我们定义Action接口,创建GripperAction.action文件:

# Goal定义 geometry_msgs/Point target_position --- # Result定义 bool success string message --- # Feedback定义 float32 percent_complete string current_stage

2. Python实现:快速原型开发

Python版本适合快速验证算法逻辑,我们先实现Action服务端。关键步骤是继承ActionServer类并实现回调函数:

# gripper_action_server.py import rclpy from rclpy.action import ActionServer from rclpy.node import Node from custom_interfaces.action import GripperAction from geometry_msgs.msg import Point class GripperActionServer(Node): def __init__(self): super().__init__('gripper_action_server') self._action_server = ActionServer( self, GripperAction, 'gripper_action', self.execute_callback) async def execute_callback(self, goal_handle): target = goal_handle.request.target_position self.get_logger().info(f'Received goal: ({target.x}, {target.y}, {target.z})') feedback_msg = GripperAction.Feedback() # 模拟目标检测阶段 feedback_msg.percent_complete = 0.0 feedback_msg.current_stage = "Detecting object" goal_handle.publish_feedback(feedback_msg) await self.simulate_processing(2) # 路径规划阶段(更新进度到30%) feedback_msg.percent_complete = 30.0 feedback_msg.current_stage = "Planning path" goal_handle.publish_feedback(feedback_msg) await self.simulate_processing(3) # 运动执行阶段(更新进度到70%) feedback_msg.percent_complete = 70.0 feedback_msg.current_stage = "Moving arm" goal_handle.publish_feedback(feedback_msg) await self.simulate_processing(2) # 完成抓取 goal_handle.succeed() result = GripperAction.Result() result.success = True result.message = "Grasp completed successfully" return result async def simulate_processing(self, seconds): await asyncio.sleep(seconds) def main(args=None): rclpy.init(args=args) server = GripperActionServer() rclpy.spin(server) rclpy.shutdown()

客户端实现需要处理反馈回调,这里我们展示如何创建带进度条的终端输出:

# gripper_action_client.py import rclpy from rclpy.action import ActionClient from custom_interfaces.action import GripperAction from geometry_msgs.msg import Point def feedback_callback(feedback): percent = int(feedback.feedback.percent_complete) stage = feedback.feedback.current_stage bar = '[' + '#' * (percent//5) + ' ' * (20 - percent//5) + ']' print(f"\r{bar} {percent}% - {stage}", end='') async def main(args=None): rclpy.init(args=args) node = rclpy.create_node('gripper_action_client') client = ActionClient(node, GripperAction, 'gripper_action') if not client.wait_for_server(timeout_sec=5.0): node.get_logger().error('Action server not available') return goal = GripperAction.Goal() goal.target_position = Point(x=0.5, y=0.2, z=0.1) future = client.send_goal_async(goal, feedback_callback=feedback_callback) await future goal_handle = future.result() if not goal_handle.accepted: node.get_logger().info('Goal rejected') return result_future = goal_handle.get_result_async() await result_future result = result_future.result().result print(f"\nResult: {result.message}") if __name__ == '__main__': asyncio.run(main())

运行效果示例:

[######## ] 40% - Moving arm [############## ] 70% - Gripping object Result: Grasp completed successfully

3. C++实现:高性能生产环境方案

对于需要更高性能的场景,C++是更好的选择。以下是服务端实现的关键部分:

// gripper_action_server.cpp #include <rclcpp/rclcpp.hpp> #include <rclcpp_action/rclcpp_action.hpp> #include <custom_interfaces/action/gripper_action.hpp> using GripperAction = custom_interfaces::action::GripperAction; class GripperActionServer : public rclcpp::Node { public: GripperActionServer() : Node("gripper_action_server") { using namespace std::placeholders; action_server_ = rclcpp_action::create_server<GripperAction>( this, "gripper_action", std::bind(&GripperActionServer::handle_goal, this, _1, _2), std::bind(&GripperActionServer::handle_cancel, this, _1), std::bind(&GripperActionServer::handle_accepted, this, _1)); } private: rclcpp_action::Server<GripperAction>::SharedPtr action_server_; rclcpp_action::GoalResponse handle_goal( const rclcpp_action::GoalUUID & uuid, std::shared_ptr<const GripperAction::Goal> goal) { (void)uuid; RCLCPP_INFO(get_logger(), "Received goal position: (%.2f, %.2f, %.2f)", goal->target_position.x, goal->target_position.y, goal->target_position.z); return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; } rclcpp_action::CancelResponse handle_cancel( const std::shared_ptr<rclcpp_action::ServerGoalHandle<GripperAction>> goal_handle) { RCLCPP_INFO(get_logger(), "Received cancel request"); (void)goal_handle; return rclcpp_action::CancelResponse::ACCEPT; } void handle_accepted(const std::shared_ptr<rclcpp_action::ServerGoalHandle<GripperAction>> goal_handle) { std::thread{std::bind(&GripperActionServer::execute, this, _1), goal_handle}.detach(); } void execute(const std::shared_ptr<rclcpp_action::ServerGoalHandle<GripperAction>> goal_handle) { auto feedback = std::make_shared<GripperAction::Feedback>(); auto result = std::make_shared<GripperAction::Result>(); // 第一阶段:目标检测 feedback->percent_complete = 0.0f; feedback->current_stage = "Detecting object"; goal_handle->publish_feedback(feedback); std::this_thread::sleep_for(std::chrono::seconds(2)); // 检查是否被取消 if (goal_handle->is_canceling()) { result->success = false; result->message = "Action canceled during detection"; goal_handle->canceled(result); return; } // 第二阶段:路径规划(更新到30%) feedback->percent_complete = 30.0f; feedback->current_stage = "Planning path"; goal_handle->publish_feedback(feedback); std::this_thread::sleep_for(std::chrono::seconds(3)); // 最终执行 feedback->percent_complete = 100.0f; feedback->current_stage = "Grasping complete"; goal_handle->publish_feedback(feedback); result->success = true; result->message = "Object successfully grasped"; goal_handle->succeed(result); } }; int main(int argc, char ** argv) { rclcpp::init(argc, argv); auto server = std::make_shared<GripperActionServer>(); rclcpp::spin(server); rclcpp::shutdown(); return 0; }

对应的客户端实现需要注意线程安全,以下是带进度显示的关键代码:

// gripper_action_client.cpp #include <rclcpp/rclcpp.hpp> #include <rclcpp_action/rclcpp_action.hpp> #include <custom_interfaces/action/gripper_action.hpp> #include <geometry_msgs/msg/point.hpp> using GripperAction = custom_interfaces::action::GripperAction; using GoalHandle = rclcpp_action::ClientGoalHandle<GripperAction>; class GripperActionClient : public rclcpp::Node { public: GripperActionClient() : Node("gripper_action_client") { client_ = rclcpp_action::create_client<GripperAction>(this, "gripper_action"); } void send_goal() { if (!client_->wait_for_action_server(std::chrono::seconds(5))) { RCLCPP_ERROR(get_logger(), "Action server not available"); return; } auto goal = GripperAction::Goal(); goal.target_position.x = 0.5; goal.target_position.y = 0.2; goal.target_position.z = 0.1; auto send_goal_options = rclcpp_action::Client<GripperAction>::SendGoalOptions(); send_goal_options.feedback_callback = [this](GoalHandle::SharedPtr, const std::shared_ptr<const GripperAction::Feedback> feedback) { print_progress(feedback->percent_complete, feedback->current_stage); }; client_->async_send_goal(goal, send_goal_options); } private: rclcpp_action::Client<GripperAction>::SharedPtr client_; void print_progress(float percent, const std::string & stage) { int bar_width = 50; int pos = static_cast<int>(bar_width * percent / 100.0); std::cout << "\r["; for (int i = 0; i < bar_width; ++i) { if (i < pos) std::cout << "="; else if (i == pos) std::cout << ">"; else std::cout << " "; } std::cout << "] " << static_cast<int>(percent) << "% - " << stage; std::cout.flush(); } }; int main(int argc, char ** argv) { rclcpp::init(argc, argv); auto client = std::make_shared<GripperActionClient>(); client->send_goal(); rclcpp::spin(client); rclcpp::shutdown(); return 0; }

4. 高级技巧与最佳实践

在实际部署中,有几个关键点需要注意:

反馈频率优化

  • 黄金频率:反馈间隔建议在100-500ms之间
  • 事件驱动更新:关键状态变化时立即发送反馈
  • 带宽考虑:反馈消息应保持精简
# 优化后的反馈发布示例 last_update = time.time() while executing: current_time = time.time() if current_time - last_update > 0.2 or state_changed: # 200ms或状态变化时 publish_feedback() last_update = current_time

错误处理与恢复

建议的错误处理流程:

  1. 检测异常(如运动规划失败)
  2. 更新反馈状态(设置错误信息)
  3. 尝试恢复(如重试或调整参数)
  4. 最终决定(继续或中止任务)

多任务协调

当需要协调多个Action时(如移动底盘+机械臂操作),可以使用以下模式:

// 并行执行多个Action的示例 auto future1 = client1->async_send_goal(goal1); auto future2 = client2->async_send_goal(goal2); while (!future1.is_ready() || !future2.is_ready()) { if (future1.is_ready() && future1.get().result->success) { // 处理第一个任务完成 } // 类似处理第二个任务 }

性能对比数据

以下是Python和C++实现的性能对比(测试环境:ROS2 Humble,Ubuntu 22.04):

指标Python实现C++实现
启动时间(ms)12045
反馈延迟(ms)15-302-5
CPU占用(%)8-123-5
内存占用(MB)8532

对于大多数应用场景,Python版本已经足够高效。但在以下情况建议选择C++:

  • 需要处理高频率控制(>100Hz)
  • 运行在资源受限的硬件上
  • 需要与其他C++库深度集成
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/1 19:51:17

Verilog-A学习资料:SAR ADC与模拟/混合信号IC设计的现成器件代码大全

Verilog-A 学习资料 SAR ADC 模数转换器 混合信号IC设计 模拟IC设计 包含现成常用的Verilog-A器件代码&#xff0c;可以直接拿来用 Verilog-A 一种使用 Verilog 的语法来描述模拟电路的行为搞电路设计的兄弟应该都懂&#xff0c;Verilog-A这玩意儿就像模拟工程师的瑞士军刀。特…

作者头像 李华
网站建设 2026/5/21 16:20:10

工程伦理案例分析:从经典失败项目看责任分配与风险预防

工程伦理案例分析&#xff1a;从经典失败项目看责任分配与风险预防 当一座桥梁在通车典礼上轰然倒塌&#xff0c;当一栋新建大楼在台风中支离破碎&#xff0c;这些触目惊心的工程事故背后&#xff0c;往往隐藏着复杂的伦理困境。工程伦理不是简单的对错判断题&#xff0c;而是需…

作者头像 李华
网站建设 2026/5/21 16:20:09

Agent面试题

1 什么是 AI Agent&#xff1f;和普通大模型应用的区别&#xff1f; 最直白定义 AI Agent 能自主思考、自主规划、自主执行、自主纠错的“AI 智能体” 它不是一问一答&#xff0c;而是有目标、有记忆、有步骤、会复盘。普通大模型应用&#xff08;你平时用的&#xff09; 一问…

作者头像 李华
网站建设 2026/4/1 19:50:11

像素时装锻造坊商业应用:电商像素风商品图批量生成实战教程

像素时装锻造坊商业应用&#xff1a;电商像素风商品图批量生成实战教程 1. 项目介绍与核心价值 像素时装锻造坊&#xff08;Pixel Fashion Atelier&#xff09;是一款专为电商场景设计的像素风格图像生成工具&#xff0c;基于Stable Diffusion与Anything-v5模型构建。不同于传…

作者头像 李华
网站建设 2026/4/1 19:47:46

TensorRT-LLM 安装使用笔记

目录 2026.04.01 linux 安装成功&#xff1b; docker版&#xff1b; docker 验证&#xff1a; 2026.04.01 linux 安装成功&#xff1b; pip install tensorrt-llm 上面的安装完&#xff0c;自己安装了torch2.9 匹配版本安装&#xff0c;待测试结果&#xff1a; pip insta…

作者头像 李华
网站建设 2026/4/1 19:47:15

Qwen3.5-9B成本优化:Spot实例+自动休眠+低峰期资源释放策略

Qwen3.5-9B成本优化&#xff1a;Spot实例自动休眠低峰期资源释放策略 1. 项目概述 Qwen3.5-9B是一款拥有90亿参数的开源大语言模型&#xff0c;具备以下核心能力&#xff1a; 强逻辑推理&#xff1a;能够处理复杂的逻辑推理任务代码生成&#xff1a;支持多种编程语言的代码生…

作者头像 李华