欢迎访问宙启技术站
智能推送

使用Python的actionlibSimpleActionServer()处理动作响应的简单示例

发布时间:2023-12-31 17:37:03

在使用Python中的actionlibSimpleActionServer()处理动作响应时,首先需要明确的是action是基于状态机的方式来描述任务的。在Python中,actionlib库提供了用于实现简单动作服务器的功能。

在下面的示例中,我们将使用actionlib库来创建一个简单的动作服务器,该服务器被称为"demo_server"。我们将在服务器上实现一个简单的动作,即计算给定数值的平方。

首先,我们需要导入所需的库和消息类型:

#! /usr/bin/env python
import rospy
import actionlib

from demo_action_server.msg import DemoAction, DemoFeedback, DemoResult

接下来,我们需要实现动作服务器的回调函数。回调函数会在开始执行动作时被调用,并接收来自客户端的目标。在我们的示例中,回调函数将计算目标值的平方,并在每次计算完成后发布反馈。

def action_callback(goal):
    # 创建反馈对象
    feedback = DemoFeedback()
    # 获取客户端的目标数值
    target_number = goal.number
    # 计算平方并发布反馈
    for i in range(target_number):
        feedback.progress = i+1
        # 发布反馈
        server.publish_feedback(feedback)
        # 模拟实际计算过程
        rospy.sleep(1.0)
    # 创建结果对象
    result = DemoResult()
    # 设置结果数值为平方值
    result.square = target_number * target_number
    # 完成动作并发布结果
    server.set_succeeded(result)

现在我们需要创建一个ROS节点,在节点中实例化DemoAction服务器,并将回调函数绑定到服务器上。

if __name__ == '__main__':
    rospy.init_node('demo_server')
    # 创建动作服务器
    server = actionlib.SimpleActionServer('demo_action', DemoAction, callback=action_callback, auto_start=False)
    # 开始服务器
    server.start()
    rospy.spin()

在上面的代码中,auto_start=False表示我们希望手动启动服务器。然后我们调用server.start()来启动服务器。

最后,我们还需要创建一个msg文件夹,并在其中创建一个名为"Demo.action"的动作文件。动作文件定义了动作的目标和结果,它应该包含以下内容:

float64 number           # 目标数值
---
float64 square           # 结果数值

将上述代码保存为demo_server.py,并将msg文件夹中的"Demo.action"文件放在正确的位置。

最后,我们可以通过以下步骤运行服务器:

1. 首先,在终端上运行ROS主节点:roscore

2. 在另一个终端上,使用ROS节点运行动作服务器:rosrun demo_action_server demo_server.py

3. 在另一个终端上,使用ROS节点运行动作客户端:rosrun demo_action_server demo_client.py

动作客户端可使用以下Python代码编写:

#! /usr/bin/env python
import rospy
import actionlib

from demo_action_server.msg import DemoAction, DemoGoal

if __name__ == '__main__':
    rospy.init_node('demo_client')
    # 创建动作客户端
    client = actionlib.SimpleActionClient('demo_action', DemoAction)
    # 等待服务器启动
    client.wait_for_server()
    # 创建目标对象
    goal = DemoGoal()
    # 设置目标数值为5
    goal.number = 5
    # 发送目标并等待完成
    client.send_goal(goal)
    client.wait_for_result()
    # 输出结果
    print("Square of 5 is:", client.get_result().square)

在上面的代码中,我们首先创建一个动作客户端,然后使用wait_for_server()方法等待服务器启动。接下来,我们创建了一个目标对象,设置目标数值为5,并使用send_goal()方法发送目标。最后,我们使用wait_for_result()方法等待结果,并使用get_result()方法获取结果。

通过上述步骤,我们可以在动作服务器和动作客户端之间建立起通信,并实现所需的动作功能。