目标:用 Python 实现一个动作服务器和客户端。
教程级别:中级
时间:15 分钟
目录
背景
先决条件
任务
1. 编写动作服务器
2. 编写动作客户端
摘要
相关内容
背景
动作是 ROS 2 中异步通信的一种形式。动作客户端向动作服务器发送目标请求。动作服务器向动作客户端发送目标反馈和结果。
先决条件
您将需要 custom_action_interfaces
包和在上一教程中定义的 Fibonacci.action
接口,创建一个操作。
任务
1. 编写动作服务器
让我们专注于编写一个动作服务器,使用我们在创建动作教程中创建的动作来计算斐波那契序列。
直到现在,您已经创建了包并使用 ros2 run
来运行您的节点。然而,为了在本教程中保持简单,我们将把动作服务器限定在一个文件中。如果您想看看完整的动作教程包是什么样子,请查看 action_tutorials https://github.com/ros2/demos/tree/jazzy/action_tutorials 。
在您的home目录中打开一个新文件,我们称它为 fibonacci_action_server.py
,并添加以下代码:
import rclpy # 导入ROS2的Python库 from rclpy.action import ActionServer # 从rclpy.action模块导入ActionServer类 from rclpy.node import Node # 从rclpy.node模块导入Node类 from custom_action_interfaces.action import Fibonacci # 导入自定义的Fibonacci动作接口 class FibonacciActionServer(Node): # 定义一个名为FibonacciActionServer的类,该类继承自Node类 def __init__(self): # 类的初始化函数 super().__init__('fibonacci_action_server') # 调用父类的初始化函数,创建一个名为'fibonacci_action_server'的节点 self._action_server = ActionServer( # 创建一个动作服务器 self, # 传入当前节点实例 Fibonacci, # 指定动作接口为Fibonacci 'fibonacci', # 动作的名字为'fibonacci' self.execute_callback) # 当收到动作目标时,调用的回调函数为execute_callback def execute_callback(self, goal_handle): # 定义动作执行的回调函数 self.get_logger().info('Executing goal...') # 打印日志信息 result = Fibonacci.Result() # 创建一个Fibonacci动作的结果实例 return result # 返回结果实例 def main(args=None): # 定义主函数 rclpy.init(args=args) # 初始化ROS2的Python库 fibonacci_action_server = FibonacciActionServer() # 创建一个FibonacciActionServer的实例 rclpy.spin(fibonacci_action_server) # 使fibonacci_action_server节点保持活动状态,直到节点被显式关闭或按下Ctrl+C if __name__ == '__main__': # 如果当前脚本被直接运行,而不是被导入为模块运行 main() # 调用主函数
第 8 行定义了一个类 FibonacciActionServer
,它是 Node
的子类。该类通过调用 Node
构造函数来初始化,将我们的节点命名为 fibonacci_action_server
:
super().__init__('fibonacci_action_server')
在构造函数中,我们还实例化了一个新的动作服务器:
self._action_server = ActionServer( self, Fibonacci, 'fibonacci', self.execute_callback)
一个动作服务器需要四个参数:
一个 ROS 2 节点用于添加动作客户端到:
self
。动作类型:
Fibonacci
(在第 5 行导入)。动作名称:
'fibonacci'
。用于执行已接受目标的回调函数:
self.execute_callback
。此回调函数必须为动作类型返回一个结果消息。
我们还在我们的类中定义了一个 execute_callback
方法:
def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') result = Fibonacci.Result() return result
这是一旦目标被接受后将被调用来执行的方法。
让我们尝试运行我们的动作服务器:
python3 fibonacci_action_server.py
在另一个终端,我们可以使用命令行界面来发送一个目标:
ros2 action send_goal fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"
在运行动作服务器的终端中,您应该会看到一条记录消息“正在执行目标...”,随后是一个警告,提示目标状态未设置。默认情况下,如果在执行回调中未设置目标句柄状态,它会假定为中止状态。
我们可以在目标句柄上调用 succeed()
来表示目标已成功实现:
def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') goal_handle.succeed() result = Fibonacci.Result() return result
如果您现在重启动作服务器并发送另一个目标,您应该会看到目标以状态 SUCCEEDED
完成。
现在,让我们的目标执行实际计算并返回请求的斐波那契序列:
def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') sequence = [0, 1] for i in range(1, goal_handle.request.order): sequence.append(sequence[i] + sequence[i-1]) goal_handle.succeed() result = Fibonacci.Result() result.sequence = sequence return result
计算序列后,我们在返回之前将其分配给结果消息字段。
再次重启动作服务器并发送另一个目标。你应该会看到目标以正确的结果序列结束。
1.2 发布反馈
动作的一个好处是在目标执行期间能够向动作客户端提供反馈。我们可以通过调用目标句柄的 publish_feedback()
方法,使我们的动作服务器为动作客户端发布反馈。
我们将替换 sequence
变量,并使用反馈消息来存储序列。在 for 循环中每次更新反馈消息后,我们发布反馈消息并暂停以产生戏剧性效果:
import time # 导入Python的时间库 import rclpy # 导入ROS2的Python库 from rclpy.action import ActionServer # 从rclpy.action模块导入ActionServer类 from rclpy.node import Node # 从rclpy.node模块导入Node类 from custom_action_interfaces.action import Fibonacci # 导入自定义的Fibonacci动作接口 class FibonacciActionServer(Node): # 定义一个名为FibonacciActionServer的类,该类继承自Node类 def __init__(self): # 类的初始化函数 super().__init__('fibonacci_action_server') # 调用父类的初始化函数,创建一个名为'fibonacci_action_server'的节点 self._action_server = ActionServer( # 创建一个动作服务器 self, # 传入当前节点实例 Fibonacci, # 指定动作接口为Fibonacci 'fibonacci', # 动作的名字为'fibonacci' self.execute_callback) # 当收到动作目标时,调用的回调函数为execute_callback def execute_callback(self, goal_handle): # 定义动作执行的回调函数 self.get_logger().info('Executing goal...') # 打印日志信息 feedback_msg = Fibonacci.Feedback() # 创建一个Fibonacci动作的反馈信息实例 feedback_msg.partial_sequence = [0, 1] # 初始化反馈信息的部分序列为[0, 1] for i in range(1, goal_handle.request.order): # 对于目标请求的序列长度进行循环 feedback_msg.partial_sequence.append( # 在部分序列后面添加新的元素 feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1]) # 新的元素为部分序列的最后两个元素之和 self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence)) # 打印反馈信息的部分序列 goal_handle.publish_feedback(feedback_msg) # 发布反馈信息 time.sleep(1) # 暂停1秒 goal_handle.succeed() # 动作目标执行成功 result = Fibonacci.Result() # 创建一个Fibonacci动作的结果实例 result.sequence = feedback_msg.partial_sequence # 将反馈信息的部分序列赋值给结果的序列 return result # 返回结果实例 def main(args=None): # 定义主函数 rclpy.init(args=args) # 初始化ROS2的Python库 fibonacci_action_server = FibonacciActionServer() # 创建一个FibonacciActionServer的实例 rclpy.spin(fibonacci_action_server) # 使fibonacci_action_server节点保持活动状态,直到节点被显式关闭或按下Ctrl+C if __name__ == '__main__': # 如果当前脚本被直接运行,而不是被导入为模块运行 main() # 调用主函数
重启动作服务器后,我们可以通过使用带有 --feedback
选项的命令行工具来确认现在已发布反馈:
ros2 action send_goal --feedback fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"
2. 编写动作客户端
我们还将将动作客户端限定在单个文件中。打开一个新文件,我们称之为 fibonacci_action_client.py
,并添加以下样板代码:
import rclpy # 导入ROS2的Python库 from rclpy.action import ActionClient # 从rclpy.action模块导入ActionClient类 from rclpy.node import Node # 从rclpy.node模块导入Node类 from custom_action_interfaces.action import Fibonacci # 导入自定义的Fibonacci动作接口 class FibonacciActionClient(Node): # 定义一个名为FibonacciActionClient的类,该类继承自Node类 def __init__(self): # 类的初始化函数 super().__init__('fibonacci_action_client') # 调用父类的初始化函数,创建一个名为'fibonacci_action_client'的节点 self._action_client = ActionClient(self, Fibonacci, 'fibonacci') # 创建一个动作客户端,动作接口为Fibonacci,动作的名字为'fibonacci' def send_goal(self, order): # 定义发送动作目标的函数 goal_msg = Fibonacci.Goal() # 创建一个Fibonacci动作的目标实例 goal_msg.order = order # 设置目标的序列长度为函数的参数 self._action_client.wait_for_server() # 等待动作服务器 return self._action_client.send_goal_async(goal_msg) # 异步发送动作目标,并返回结果的Future对象 def main(args=None): # 定义主函数 rclpy.init(args=args) # 初始化ROS2的Python库 action_client = FibonacciActionClient() # 创建一个FibonacciActionClient的实例 future = action_client.send_goal(10) # 发送动作目标,序列长度为10,返回结果的Future对象 rclpy.spin_until_future_complete(action_client, future) # 使action_client节点保持活动状态,直到Future对象完成 if __name__ == '__main__': # 如果当前脚本被直接运行,而不是被导入为模块运行 main() # 调用主函数
我们定义了一个类 FibonacciActionClient
,它是 Node
的子类。该类通过调用 Node
构造函数来初始化,将我们的节点命名为 fibonacci_action_client
:
super().__init__('fibonacci_action_client')
在类构造函数中,我们还使用上一教程中的自定义动作定义创建了一个动作客户端:
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
我们通过传递三个参数来创建一个 ActionClient
一个 ROS 2 节点用于添加动作客户端到:
self
动作的类型:
Fibonacci
动作名称:
'fibonacci'
我们的动作客户端将能够与具有相同动作名称和类型的动作服务器进行通信。
我们还在 FibonacciActionClient
类中定义了一个方法 send_goal
:
def send_goal(self, order): goal_msg = Fibonacci.Goal() goal_msg.order = order self._action_client.wait_for_server() return self._action_client.send_goal_async(goal_msg)
此方法等待动作服务器可用,然后向服务器发送一个目标。它返回一个我们稍后可以等待的未来。
在类定义之后,我们定义了一个函数 main()
,它初始化 ROS 2 并创建我们的 FibonacciActionClient
节点的实例。然后它发送一个目标,并等待直到该目标完成。
最后,我们在 Python 程序的入口点调用 main()
。
让我们通过首先运行之前构建的动作服务器来测试我们的动作客户端:
python3 fibonacci_action_server.py
在另一个终端中,运行动作客户端:
python3 fibonacci_action_client.py
您应该会看到动作服务器在成功执行目标时打印的消息:
[INFO] [fibonacci_action_server]: Executing goal... [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5]) # etc.
动作客户端应该启动,然后迅速完成。在这一点上,我们有一个功能正常的动作客户端,但我们看不到任何结果,也没有得到任何反馈。
2.1 获得结果
所以我们可以发送一个目标,但我们如何知道它何时完成?我们可以通过几个步骤获取结果信息。首先,我们需要为我们发送的目标获取一个目标句柄。然后,我们可以使用目标句柄来请求结果。
这是此示例的完整代码:
# 导入 ROS 客户端库 import rclpy # 导入 ROS 动作客户端 from rclpy.action import ActionClient # 导入 ROS 节点 from rclpy.node import Node # 导入自定义的 Fibonacci 动作 from custom_action_interfaces.action import Fibonacci # 定义 Fibonacci 动作客户端类,继承自 Node class FibonacciActionClient(Node): # 初始化函数 def __init__(self): # 调用父类初始化函数,设置节点名为 'fibonacci_action_client' super().__init__('fibonacci_action_client') # 创建动作客户端,动作类型为 Fibonacci,动作名为 'fibonacci' self._action_client = ActionClient(self, Fibonacci, 'fibonacci') # 发送目标函数 def send_goal(self, order): # 创建目标消息 goal_msg = Fibonacci.Goal() # 设置目标消息的 order 字段 goal_msg.order = order # 等待动作服务器 self._action_client.wait_for_server() # 异步发送目标,并获取 future 对象 self._send_goal_future = self._action_client.send_goal_async(goal_msg) # 为 future 对象添加完成回调 self._send_goal_future.add_done_callback(self.goal_response_callback) # 目标响应回调函数 def goal_response_callback(self, future): # 获取目标句柄 goal_handle = future.result() # 如果目标未被接受 if not goal_handle.accepted: # 记录信息:目标被拒绝 self.get_logger().info('目标被拒绝 :(') return # 记录信息:目标被接受 self.get_logger().info('目标被接受 :)') # 异步获取结果,并获取 future 对象 self._get_result_future = goal_handle.get_result_async() # 为 future 对象添加完成回调 self._get_result_future.add_done_callback(self.get_result_callback) # 获取结果回调函数 def get_result_callback(self, future): # 获取结果 result = future.result().result # 记录结果信息 self.get_logger().info('结果: {0}'.format(result.sequence)) # 关闭 ROS rclpy.shutdown() # 主函数 def main(args=None): # 初始化 ROS rclpy.init(args=args) # 创建 Fibonacci 动作客户端对象 action_client = FibonacciActionClient() # 发送目标,目标值为 10 action_client.send_goal(10) # 保持 ROS 运行,处理回调 rclpy.spin(action_client) # 如果当前脚本为主程序,则运行主函数 if __name__ == '__main__': main()
` ActionClient.send_goal_async()
`方法返回一个对目标句柄的未来。首先我们注册一个回调,以便在未来完成时使用:
self._send_goal_future.add_done_callback(self.goal_response_callback)
请注意,当动作服务器接受或拒绝目标请求时,未来就完成了。让我们更详细地看看 goal_response_callback
。我们可以检查目标是否被拒绝,并提前返回,因为我们知道不会有结果:
def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self.get_logger().info('Goal accepted :)')
现在我们有了一个目标句柄,我们可以使用它通过方法 get_result_async()
来请求结果。与发送目标类似,我们将获得一个在结果准备好时完成的未来。让我们注册一个回调,就像我们为目标响应所做的那样:
self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback)
在回调中,我们记录结果序列并关闭 ROS 2 以进行干净的退出:
def get_result_callback(self, future): result = future.result().result self.get_logger().info('Result: {0}'.format(result.sequence)) rclpy.shutdown()
在一个单独的终端中运行动作服务器后,继续尝试运行我们的斐波那契动作客户端!
python3 fibonacci_action_client.py
您应该会看到目标被接受和最终结果的记录消息。
2.2 获取反馈
我们的动作客户端可以发送目标。太好了!但如果我们能从动作服务器那里得到一些我们发送的目标的反馈就更好了。
这是此示例的完整代码:
# 导入 ROS 客户端库 import rclpy # 导入 ROS 动作客户端 from rclpy.action import ActionClient # 导入 ROS 节点 from rclpy.node import Node # 导入自定义的 Fibonacci 动作 from custom_action_interfaces.action import Fibonacci # 定义 Fibonacci 动作客户端类,继承自 Node class FibonacciActionClient(Node): # 初始化函数 def __init__(self): # 调用父类初始化函数,设置节点名为 'fibonacci_action_client' super().__init__('fibonacci_action_client') # 创建动作客户端,动作类型为 Fibonacci,动作名为 'fibonacci' self._action_client = ActionClient(self, Fibonacci, 'fibonacci') # 发送目标函数 def send_goal(self, order): # 创建目标消息 goal_msg = Fibonacci.Goal() # 设置目标消息的 order 字段 goal_msg.order = order # 等待动作服务器 self._action_client.wait_for_server() # 异步发送目标,并获取 future 对象,同时设置反馈回调函数 self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback) # 为 future 对象添加完成回调 self._send_goal_future.add_done_callback(self.goal_response_callback) # 目标响应回调函数 def goal_response_callback(self, future): # 获取目标句柄 goal_handle = future.result() # 如果目标未被接受 if not goal_handle.accepted: # 记录信息:目标被拒绝 self.get_logger().info('目标被拒绝 :(') return # 记录信息:目标被接受 self.get_logger().info('目标被接受 :)') # 异步获取结果,并获取 future 对象 self._get_result_future = goal_handle.get_result_async() # 为 future 对象添加完成回调 self._get_result_future.add_done_callback(self.get_result_callback) # 获取结果回调函数 def get_result_callback(self, future): # 获取结果 result = future.result().result # 记录结果信息 self.get_logger().info('结果: {0}'.format(result.sequence)) # 关闭 ROS rclpy.shutdown() # 反馈回调函数 def feedback_callback(self, feedback_msg): # 获取反馈 feedback = feedback_msg.feedback # 记录反馈信息 self.get_logger().info('收到反馈: {0}'.format(feedback.partial_sequence)) # 主函数 def main(args=None): # 初始化 ROS rclpy.init(args=args) # 创建 Fibonacci 动作客户端对象 action_client = FibonacciActionClient() # 发送目标,目标值为 10 action_client.send_goal(10) # 保持 ROS 运行,处理回调 rclpy.spin(action_client) # 如果当前脚本为主程序,则运行主函数 if __name__ == '__main__': main()
这是反馈消息的回调函数:
def feedback_callback(self, feedback_msg): feedback = feedback_msg.feedback self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
在回调中,我们获取消息的反馈部分并将 partial_sequence
字段打印到屏幕上。
我们需要在动作客户端注册回调。这是通过在发送目标时额外传递回调给动作客户端来实现的:
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
我们已经准备好了。如果我们运行我们的动作客户端,你应该会看到反馈被打印到屏幕上。
摘要
在本教程中,您将逐行组合一个 Python 动作服务器和动作客户端,并配置它们以交换目标、反馈和结果。
相关内容
在 Python 中编写动作服务器和客户端有几种方法;请查看 ros2/examples 仓库 https://github.com/ros2/examples/tree/jazzy/rclpy/actions 中的
minimal_action_server
和minimal_action_client
包。有关 ROS 操作的更详细信息,请参阅设计文章 https://design.ros2.org/articles/actions.html 。