欢迎访问宙启技术站
智能推送

在Python中使用SimpleActionServer()实现一个简易的动作处理器

发布时间:2024-01-18 05:42:04

在Python中,可以使用SimpleActionServer()类来实现一个简易的动作处理器。SimpleActionServer()是ROS(Robot Operating System)中的一个类,用于在实时控制系统中处理动作请求和提供反馈。

下面是一个使用SimpleActionServer()的示例:

import rospy
import actionlib

from actionlib.msg import TestAction, TestFeedback, TestResult


def action_callback(goal):
    # 在这里编写处理动作的代码
    feedback = TestFeedback()
    result = TestResult()

    # 执行动作并发送反馈
    for i in range(1, goal.target+1):
        feedback.progress = i
        server.publish_feedback(feedback)
        rospy.sleep(1)

    if goal.target < 0:
        server.set_aborted(result)
    else:
        result.final_count = goal.target
        server.set_succeeded(result)


rospy.init_node('action_server')
server = actionlib.SimpleActionServer('test_action', TestAction, action_callback, auto_start=False)

# 启动动作处理器
server.start()
rospy.spin()

在这个例子中,我们首先导入了rospyactionlib模块。然后,我们分别导入了TestActionTestFeedbackTestResult这三个消息类型,它们是我们自定义的消息类型,用于传递动作请求、反馈和结果。

接下来,我们定义了一个action_callback()函数,用于处理动作请求。在该函数中,我们首先创建了TestFeedbackTestResult的实例,这将用于发送动作的反馈和结果。

action_callback()函数中,我们使用一个for循环来执行动作。在每次循环中,我们更新并发送当前的进度信息作为反馈给客户端。在这个例子中,我们使用goal.target作为动作的目标值,循环的次数就是目标值的大小。

最后,我们检查目标值是否小于0,如果是,则调用server.set_aborted(result)将动作标记为“中止”状态,否则,我们将获得的目标值goal.target作为结果的final_count字段,并调用server.set_succeeded(result)将动作标记为“成功”状态。

在主程序中,我们首先初始化ROS节点,并创建一个名为'test_action'的动作服务器。我们使用server.start()来启动动作处理器,并使用rospy.spin()来使程序保持运行状态。

下面是一个使用此动作处理器的客户端的示例:

import rospy
import actionlib

from actionlib.msg import TestAction, TestGoal, TestResult


rospy.init_node('action_client')
client = actionlib.SimpleActionClient('test_action', TestAction)
client.wait_for_server()

goal = TestGoal()
goal.target = 10

client.send_goal(goal)

while not client.wait_for_result(rospy.Duration.from_sec(1.0)):
    rospy.loginfo("Waiting for the result...")

result = client.get_result()

if result is not None:
    rospy.loginfo("Final count: %d", result.final_count)
else:
    rospy.loginfo("Action aborted by the server.")

在这个例子中,我们首先初始化ROS节点,并创建一个名为'test_action'的动作客户端。然后,我们使用client.wait_for_server()来等待动作服务器启动。

接下来,我们创建了一个TestGoal的实例,并将goal.target设置为10,这将作为动作的目标值。

然后,我们使用client.send_goal(goal)将目标值发送给动作服务器。

接着,我们使用一个while循环来检查动作的状态。在每次循环中,我们使用client.wait_for_result()等待动作服务器返回结果,并通过client.get_result()获取动作的结果。

最后,我们根据返回的结果打印相应的信息。

需要注意的是,在运行程序之前,确保已经启动了动作服务器。

以上就是使用SimpleActionServer()实现一个简易的动作处理器的示例,以及一个相应的客户端示例。通过这种方式,我们可以实现在Python中利用ROS进行动作处理和反馈。