欢迎访问宙启技术站
智能推送

使用SimpleActionServer()在Python中实现简单的动作服务器

发布时间:2023-12-19 05:15:57

在ROS中,动作服务器(Action Server)是一种用于执行长时间任务的机制。它允许客户端发送目标(Goal)给服务器,并以异步的方式来获得执行结果(Result)。动作服务器通常用于执行耗时的任务,例如机器人的导航或物体的抓取。

在Python中,可以使用SimpleActionServer类来实现简单的动作服务器。SimpleActionServer是ROS提供的一个封装了底层动作服务器机制的实现。

下面是一个使用SimpleActionServer的简单示例:

import rospy
import actionlib
from actionlib_msgs.msg import *
from std_msgs.msg import Empty
from my_package.msg import MyAction, MyActionFeedback, MyActionResult

class SimpleActionServerExample:
    def __init__(self):
        self.server = actionlib.SimpleActionServer('my_action', MyAction, self.execute, False)
        self.server.start()

        self.rate = rospy.Rate(1)
        self.counter = 0

    def execute(self, goal):
        self.counter = 0

        # 发送反馈到客户端
        feedback = MyActionFeedback()
        feedback.current_progress = self.counter
        self.server.publish_feedback(feedback)

        # 执行任务
        while self.counter < goal.target:
            self.counter += 1

            # 检查动作服务器是否被取消
            if self.server.is_preempt_requested():
                rospy.loginfo('Action preempted')
                self.server.set_preempted()
                return

            # 更新反馈
            feedback.current_progress = self.counter
            self.server.publish_feedback(feedback)

            self.rate.sleep()

        # 发送结果到客户端
        result = MyActionResult()
        result.final_result = self.counter
        self.server.set_succeeded(result)

rospy.init_node('simple_action_server_example')
server = SimpleActionServerExample()
rospy.spin()

以上代码中,我们定义了一个SimpleActionServerExample类,它初始化了一个名为my_action的动作服务器,并指定了使用MyAction消息类型。在execute方法中,我们实现了具体的任务逻辑。在任务执行过程中,我们发送了反馈给客户端,并利用ros::Rate来控制循环频率。当任务完成时,我们发送了结果给客户端。

要运行这个例子,你需要首先定义一个名为MyAction的消息类型(在my_package.msg包中),包含一个int32类型的字段target。你还需要在MyActionFeedbackMyActionResult消息类型中定义相应的反馈和结果字段。

接下来,你可以启动这个动作服务器,并通过SimpleActionClient来发送目标给它。以下是一个示例代码:

import rospy
import actionlib
from my_package.msg import MyAction, MyGoal, MyFeedback, MyResult

rospy.init_node('simple_action_client_example')
client = actionlib.SimpleActionClient('my_action', MyAction)
client.wait_for_server()

goal = MyGoal()
goal.target = 10

client.send_goal(goal)

while not client.wait_for_result(rospy.Duration(1.0)):
    feedback = client.get_feedback()
    rospy.loginfo('Current progress: %d' % feedback.current_progress)

result = client.get_result()
rospy.loginfo('Final result: %d' % result.final_result)

在上述代码中,我们首先初始化了一个名为my_action的动作客户端,并指定了使用MyAction消息类型。通过client.send_goal()方法,我们发送了一个目标给服务器。然后,在一个循环中,我们通过client.wait_for_result()等待任务执行完成,并在每次循环中获取反馈信息并打印出来。任务完成后,我们通过client.get_result()获取结果并打印出来。

此处的MyAction消息类型需要与动作服务器中的相应消息类型定义保持一致。

以上是使用SimpleActionServer在Python中实现简单的动作服务器的示例。通过这个例子,你可以了解动作服务器的基本用法,以及如何与动作客户端进行交互。