使用SimpleActionServer()在Python中实现简单的动作服务器
在ROS中,动作服务器(Action Server)是一种用于执行长时间任务的机制。它允许客户端发送目标(Goal)给服务器,并以异步的方式来获得执行结果(Result)。动作服务器通常用于执行耗时的任务,例如机器人的导航或物体的抓取。
在Python中,可以使用SimpleActionServer类来实现简单的动作服务器。SimpleActionServer是ROS提供的一个封装了底层动作服务器机制的实现。
下面是一个使用SimpleActionServer的简单示例:
import rospy
import actionlib
from actionlib_msgs.msg import *
from std_msgs.msg import Empty
from my_package.msg import MyAction, MyActionFeedback, MyActionResult
class SimpleActionServerExample:
def __init__(self):
self.server = actionlib.SimpleActionServer('my_action', MyAction, self.execute, False)
self.server.start()
self.rate = rospy.Rate(1)
self.counter = 0
def execute(self, goal):
self.counter = 0
# 发送反馈到客户端
feedback = MyActionFeedback()
feedback.current_progress = self.counter
self.server.publish_feedback(feedback)
# 执行任务
while self.counter < goal.target:
self.counter += 1
# 检查动作服务器是否被取消
if self.server.is_preempt_requested():
rospy.loginfo('Action preempted')
self.server.set_preempted()
return
# 更新反馈
feedback.current_progress = self.counter
self.server.publish_feedback(feedback)
self.rate.sleep()
# 发送结果到客户端
result = MyActionResult()
result.final_result = self.counter
self.server.set_succeeded(result)
rospy.init_node('simple_action_server_example')
server = SimpleActionServerExample()
rospy.spin()
以上代码中,我们定义了一个SimpleActionServerExample类,它初始化了一个名为my_action的动作服务器,并指定了使用MyAction消息类型。在execute方法中,我们实现了具体的任务逻辑。在任务执行过程中,我们发送了反馈给客户端,并利用ros::Rate来控制循环频率。当任务完成时,我们发送了结果给客户端。
要运行这个例子,你需要首先定义一个名为MyAction的消息类型(在my_package.msg包中),包含一个int32类型的字段target。你还需要在MyActionFeedback和MyActionResult消息类型中定义相应的反馈和结果字段。
接下来,你可以启动这个动作服务器,并通过SimpleActionClient来发送目标给它。以下是一个示例代码:
import rospy
import actionlib
from my_package.msg import MyAction, MyGoal, MyFeedback, MyResult
rospy.init_node('simple_action_client_example')
client = actionlib.SimpleActionClient('my_action', MyAction)
client.wait_for_server()
goal = MyGoal()
goal.target = 10
client.send_goal(goal)
while not client.wait_for_result(rospy.Duration(1.0)):
feedback = client.get_feedback()
rospy.loginfo('Current progress: %d' % feedback.current_progress)
result = client.get_result()
rospy.loginfo('Final result: %d' % result.final_result)
在上述代码中,我们首先初始化了一个名为my_action的动作客户端,并指定了使用MyAction消息类型。通过client.send_goal()方法,我们发送了一个目标给服务器。然后,在一个循环中,我们通过client.wait_for_result()等待任务执行完成,并在每次循环中获取反馈信息并打印出来。任务完成后,我们通过client.get_result()获取结果并打印出来。
此处的MyAction消息类型需要与动作服务器中的相应消息类型定义保持一致。
以上是使用SimpleActionServer在Python中实现简单的动作服务器的示例。通过这个例子,你可以了解动作服务器的基本用法,以及如何与动作客户端进行交互。
