欢迎访问宙启技术站
智能推送

使用Python编写自定义的SimpleActionServer()实例

发布时间:2023-12-16 06:06:00

SimpleActionServer是一个ROS封装库,用于创建和管理一个简单的Action服务器。一个Action服务器是一个ROS节点,它接收来自Action客户端发送的目标,并根据目标执行一些任务,然后向客户端发送结果。

下面是一个使用Python编写的自定义SimpleActionServer实例的示例代码:

import rospy
import actionlib
from my_custom_msgs.msg import MyAction, MyActionFeedback, MyActionResult

class MyActionServer(object):
    # 初始化Action服务器
    def __init__(self):
        # 创建一个Action服务器对象,指定消息类型为MyAction
        self.server = actionlib.SimpleActionServer('my_action', MyAction, self.execute, False)
        self.server.start()

    # 执行任务的回调函数
    def execute(self, goal):
        # 创建一个反馈对象
        feedback = MyActionFeedback()
        # 创建一个结果对象
        result = MyActionResult()

        # 执行任务逻辑,这里以延时2秒为例
        rospy.sleep(2)
        
        if self.server.is_preempt_requested():
            # 如果接收到了取消请求,停止当前任务并发送一个结果
            self.server.set_preempted(result)
        else:
            # 如果任务顺利完成,发送一个结果
            self.server.set_succeeded(result)

if __name__ == '__main__':
    # 初始化ROS节点
    rospy.init_node('my_action_server_node')

    # 创建自定义Action服务器对象
    action_server = MyActionServer()

    # 进入循环,等待回调函数执行
    rospy.spin()

在这个例子中,我们首先导入了必要的ROS和actionlib库,然后导入了自定义的消息类型MyAction,MyActionFeedback和MyActionResult。

接下来,我们创建了一个名为MyActionServer的类,它继承自object类,并包含了初始化函数__init__()和任务执行回调函数execute()。

在初始化函数__init__()中,我们创建了一个Action服务器对象self.server,并指定消息类型为MyAction。然后,我们调用self.server.start()来启动Action服务器。

在任务执行回调函数execute()中,我们首先创建了一个反馈对象feedback和一个结果对象result。

然后,我们执行了一个任务逻辑,这里只是使用rospy.sleep()来模拟一个延时2秒的任务。

接下来,我们使用self.server.is_preempt_requested()来检查是否接收到了取消请求。如果接收到了取消请求,我们调用self.server.set_preempted()来停止当前任务并发送一个结果;如果没有接收到取消请求,我们调用self.server.set_succeeded()方法来发送一个成功的结果。

在主函数中,我们初始化了ROS节点,创建了一个名为my_action_server_node的节点,并实例化了MyActionServer对象。然后,我们进入一个循环,等待回调函数的执行。

这是一个简单的自定义SimpleActionServer实例的示例。你可以根据你的具体需求修改任务逻辑和自定义消息类型,并使用rospy.spin()来等待回调函数的执行。