欢迎访问宙启技术站
智能推送

编写Python脚本使用actionlibSimpleActionServer()实现自定义动作服务器

发布时间:2023-12-31 17:42:54

Actionlib是ROS中用于实现客户端/服务器模式的库。Actionlib提供了一个SimpleActionServer类,用于实现自定义动作服务器。

下面是一个通过使用SimpleActionServer类实现一个自定义动作服务器的Python脚本的示例:

#!/usr/bin/env python

import rospy
import actionlib
from std_msgs.msg import Empty
from my_custom_msgs.msg import CustomAction, CustomActionGoal, CustomActionResult, CustomActionFeedback

class CustomActionServer:
    def __init__(self):
        # 创建动作服务器
        self.server = actionlib.SimpleActionServer('custom_action', CustomAction, execute_cb=self.execute_cb, auto_start=False)
        # 注册动作结果发布者和反馈发布者
        self.result_pub = self.server.get_publisher_for_result()
        self.feedback_pub = self.server.get_publisher_for_feedback()

        # 订阅命令消息
        self.command_sub = rospy.Subscriber('custom_action_command', Empty, self.command_callback)

        self.server.start()

    def command_callback(self, msg):
        # 创建反馈消息
        feedback_msg = CustomActionFeedback()
        feedback_msg.status = "Processing"
        # 在执行动作之前向客户端发送反馈
        self.feedback_pub.publish(feedback_msg)

        # 执行动作
        result = self.execute_action()

        # 创建结果消息
        result_msg = CustomActionResult()
        if result:
            result_msg.status = "Success"
        else:
            result_msg.status = "Failure"
        # 向客户端发送结果
        self.result_pub.publish(result_msg)

    def execute_cb(self, goal):
        # 创建反馈消息
        feedback_msg = CustomActionFeedback()
        feedback_msg.status = "Processing"
        # 在执行动作之前向客户端发送反馈
        self.feedback_pub.publish(feedback_msg)

        # 执行动作
        result = self.execute_action()

        # 创建结果消息
        result_msg = CustomActionResult()
        if result:
            result_msg.status = "Success"
        else:
            result_msg.status = "Failure"
        # 向客户端发送结果
        self.result_pub.publish(result_msg)

    def execute_action(self):
        # 在这里编写执行动作的逻辑
        # 这里仅作为示例,休眠5秒钟并返回成功
        rospy.sleep(5)
        return True

if __name__ == '__main__':
    rospy.init_node('custom_action_server')
    server = CustomActionServer()
    rospy.spin()

在这个示例中,我们创建了一个名为custom_action的自定义动作服务器,使用CustomAction消息类型。

我们还创建了一个名为custom_action_command的Empty消息类型的订阅器,用于接收来自客户端的命令。

在启动动作服务器之后,当接收到来自客户端的命令时,会调用命令回调(command_callback)函数。

在command_callback函数中,我们首先向客户端发送一个反馈消息,并执行自定义的动作(execute_action函数)。

然后,根据动作执行的结果,创建相应的结果消息,并向客户端发送。

注意,在这个示例中,我们使用execute_cb函数作为默认的执行回调函数,在command_callback函数中调用。

这样可以在代码中的任何地方调用execute_action函数执行动作,而不仅仅是在回调函数中。

这是一个简单的示例,演示了如何使用actionlib的SimpleActionServer类来实现自定义动作服务器。

有了这个示例,你可以根据自己的需求扩展和调整代码。