欢迎访问宙启技术站
智能推送

在Python中使用actionlibSimpleActionServer()发送和接收动作反馈

发布时间:2023-12-31 17:41:05

在Python中,我们可以使用actionlib.SimpleActionServer()类来创建一个简单的动作服务器,用于发送和接收动作反馈。

首先,我们需要导入actionlib模块和相关消息类型:

import actionlib
from actionlib.msg import TestAction, TestFeedback, TestResult

然后,我们可以创建一个动作服务器的回调函数,用于处理客户端发送的请求并提供反馈和结果:

def action_callback(goal):
    # 打印客户端发送的目标值
    rospy.loginfo("Received goal: %s", goal.goal)

    # 创建一个动作反馈对象
    feedback = TestFeedback()

    # 设置动作反馈的进度值
    feedback.progress = 0

    # 发布动作反馈
    server.publish_feedback(feedback)

    # 模拟完成动作的过程,逐步增加反馈进度并发布
    for i in range(10):
        rospy.sleep(1)  # 模拟长时间运行的任务
        feedback.progress += 10
        server.publish_feedback(feedback)

    # 创建一个动作结果对象
    result = TestResult()

    # 设置动作结果的状态和消息
    result.success = True
    result.message = "Action completed successfully!"

    # 发布动作结果
    server.set_succeeded(result)

然后,我们可以创建一个动作服务器并为其提供一个名称、类型和回调函数:

rospy.init_node("action_server_node")  # 初始化ROS节点

# 创建动作服务器
server = actionlib.SimpleActionServer("test_action", TestAction, execute_cb=action_callback, auto_start=False)

# 开始动作服务器
server.start()

# 循环等待动作请求
rospy.spin()

最后,我们需要编写一个客户端来发送动作请求和接收动作反馈和结果:

def action_client():
    # 创建一个动作客户端
    client = actionlib.SimpleActionClient("test_action", TestAction)

    # 等待动作服务器启动
    client.wait_for_server()

    # 创建一个动作目标对象
    goal = TestGoal()

    # 设置动作目标的值
    goal.goal = "Hello, Action!"

    # 发送动作目标
    client.send_goal(goal)

    # 等待动作完成
    client.wait_for_result()

    # 打印动作的最终结果状态和消息
    rospy.loginfo("Result: %s, %s", client.get_state(), client.get_result().message)

完整的示例代码如下:

import rospy
import actionlib
from actionlib.msg import TestAction, TestFeedback, TestResult

# 定义动作服务器回调函数
def action_callback(goal):
    # 打印客户端发送的目标值
    rospy.loginfo("Received goal: %s", goal.goal)

    # 创建一个动作反馈对象
    feedback = TestFeedback()

    # 设置动作反馈的进度值
    feedback.progress = 0

    # 发布动作反馈
    server.publish_feedback(feedback)

    # 模拟完成动作的过程,逐步增加反馈进度并发布
    for i in range(10):
        rospy.sleep(1)  # 模拟长时间运行的任务
        feedback.progress += 10
        server.publish_feedback(feedback)

    # 创建一个动作结果对象
    result = TestResult()

    # 设置动作结果的状态和消息
    result.success = True
    result.message = "Action completed successfully!"

    # 发布动作结果
    server.set_succeeded(result)

# 创建动作服务器
rospy.init_node("action_server_node")  # 初始化ROS节点
server = actionlib.SimpleActionServer("test_action", TestAction, execute_cb=action_callback, auto_start=False)

# 开始动作服务器
server.start()

# 循环等待动作请求
rospy.spin()

# 定义动作客户端函数
def action_client():
    # 创建一个动作客户端
    client = actionlib.SimpleActionClient("test_action", TestAction)

    # 等待动作服务器启动
    client.wait_for_server()

    # 创建一个动作目标对象
    goal = TestGoal()

    # 设置动作目标的值
    goal.goal = "Hello, Action!"

    # 发送动作目标
    client.send_goal(goal)

    # 等待动作完成
    client.wait_for_result()

    # 打印动作的最终结果状态和消息
    rospy.loginfo("Result: %s, %s", client.get_state(), client.get_result().message)

# 调用动作客户端函数
action_client()

上述示例代码演示了如何在Python中使用actionlib.SimpleActionServer()类发送和接收动作反馈。你可以根据自己的实际需求修改代码中的消息类型和回调函数,以便与你的项目相匹配。