欢迎访问宙启技术站
智能推送

使用Python的actionlibSimpleActionServer()处理动作预定和取消预订

发布时间:2023-12-31 17:41:47

使用Python的actionlib SimpleActionServer()处理动作预订和取消预订的示例:

import rospy
import actionlib

from my_action_pkg.msg import MyAction, MyActionFeedback, MyActionResult

class MyActionServer:
    def __init__(self):
        self.server = actionlib.SimpleActionServer('my_action', MyAction, execute_cb=self.execute_callback, auto_start=False)
        self.server.register_preempt_callback(self.preempt_callback)
        self.server.start()

    def execute_callback(self, goal):
        # 处理动作预订
        # 在这里编写处理预订逻辑的代码
        rospy.loginfo('Action goal received: %s', goal)

        feedback = MyActionFeedback()
        feedback.status = 'Processing action...'

        # 发布反馈信息
        self.server.publish_feedback(feedback)

        # 进行动作处理
        for i in range(1, goal.num_steps+1):
            # 检查是否有取消预订请求
            if self.server.is_preempt_requested():
                # 响应取消预订请求
                rospy.loginfo('Action preempted.')
                self.server.set_preempted()
                return

            # 执行动作步骤
            rospy.sleep(1)  # 模拟动作执行时间

            # 更新反馈信息
            feedback.status = 'Executing step {} of {}'.format(i, goal.num_steps)
            self.server.publish_feedback(feedback)

        # 完成动作
        result = MyActionResult()
        result.status = 'Action completed successfully.'
        rospy.loginfo('Action completed.')

        # 发布结果
        self.server.set_succeeded(result)

    def preempt_callback(self):
        # 处理取消预订请求
        rospy.loginfo('Action preempt requested.')

        # 在这里编写处理取消预订请求的代码
        # 可以在这里停止正在进行的动作

        self.server.set_preempted()

if __name__ == '__main__':
    rospy.init_node('my_action_server')
    server = MyActionServer()
    rospy.spin()

上面的示例代码实现了一个名为my_action的动作服务器。服务器在接收到动作预订时,它会执行一系列动作步骤,并且可以处理取消预订请求。

首先,我们需要创建一个MyActionServer类,它初始化一个actionlib.SimpleActionServer对象,并注册一个执行回调函数execute_callback()。该回调函数在接收到动作预订时被调用。

execute_callback()函数中,我们可以编写处理预订逻辑的代码。首先,我们打印接收到的动作预订信息,并创建一个反馈对象feedback。然后,我们调用self.server.publish_feedback(feedback)来发布反馈信息。

接下来,我们使用一个循环来模拟执行动作步骤。在每一步中,我们首先检查是否有取消预订请求。若有,则我们调用self.server.set_preempted()来设置取消预订状态,并返回。否则,我们等待一秒钟(这里使用rospy.sleep(1)模拟动作执行时间),然后更新反馈信息并发布。

当所有的动作步骤都执行完毕后,我们创建一个结果对象result,并设置其状态信息为“动作成功完成”。然后,我们调用self.server.set_succeeded(result)来发布结果。

当接收到取消预订请求时,会调用preempt_callback()函数。在该函数中,我们可以编写处理取消预订请求的代码。可以在这里停止正在进行的动作,并调用self.server.set_preempted()来设置取消预订状态。

最后,在if __name__ == '__main__'中,我们初始化ROS节点和MyActionServer对象,并调用rospy.spin()来保持节点的运行。

请注意,上述示例代码中的my_actionMyActionMyActionFeedbackMyActionResult都是自定义消息类型,你需要根据自己的需求进行相应的更改和定义。