欢迎访问宙启技术站
智能推送

Python中actionlibSimpleActionServer()的简介

发布时间:2023-12-16 06:04:14

actionlib.SimpleActionServer是一个用于创建并提供一个简单的Action服务器的Python类。它允许用户实现自定义的Action服务器,以响应Action客户端的请求。

Action是一种在异步通信中执行长时间运行任务的机制。Action服务器接收Action客户端的请求,然后根据请求的内容执行相应的任务。任务可以是在后台执行的任何操作,例如移动机器人、执行传感器数据处理、规划路径等。

下面是一个使用actionlib.SimpleActionServer的简单示例:

import rospy
import actionlib
from my_action_package.msg import MyAction  # 导入自定义的Action消息类型
from my_action_package.msg import MyResult
from my_action_package.msg import MyFeedback

# Action执行函数
def execute(goal):
    # 创建一个结果对象
    result = MyResult()

    # 发布一个反馈消息
    feedback = MyFeedback()
    feedback.progress = 0

    # 执行任务
    for i in range(1, goal.target_number + 1):
        if server.is_preempt_requested():  # 检查是否有取消请求
            # 如果有取消请求,执行相应操作(例如停止机器人的移动)
            server.set_preempted()
            return

        feedback.progress = int((float(i) / goal.target_number) * 100)  # 计算任务进度

        # 发布反馈消息
        server.publish_feedback(feedback)

        # 执行任务

        # 模拟延迟
        rospy.sleep(1)

    # 当任务完成时,设置结果状态以及返回结果对象
    result.success = True
    server.set_succeeded(result)

# 创建一个节点
rospy.init_node('my_action_server')

# 创建一个Action服务器
server = actionlib.SimpleActionServer('my_action', MyAction, execute, False)

# 开启服务器
server.start()

# 进入循环,保持服务器的运行
rospy.spin()

在上面的例子中,首先导入了必要的模块和消息类型。然后定义了一个execute函数来执行Action的任务。在这个例子中,任务是循环计数到目标数字,并在每次循环中发布一个反馈消息。当任务完成时,设置一个结果状态,并调用服务器的set_succeeded()函数。

然后创建了一个节点,并创建了一个Action服务器实例,指定了服务器的名称、Action消息类型、执行函数以及是否自动开启服务器。然后通过调用start()函数来启动服务器。

最后调用rospy.spin()函数来进入一个无限循环,使得服务器保持运行。

运行以上代码后,服务器将等待来自Action客户端的请求,并执行相应的任务。在任务的执行过程中,服务器会定期发布反馈消息,以便客户端可以了解到任务的进展情况。当任务完成后,将返回一个结果状态给客户端。

这只是一个简单的示例,实际上可以根据具体的需求自定义Action服务器的行为。可以根据实际情况修改执行函数和消息类型,以实现复杂的任务。