欢迎访问宙启技术站
智能推送

Python中使用SimpleActionServer()实现一个简单的动作服务器

发布时间:2024-01-18 05:41:02

在Python中,我们可以使用SimpleActionServer类来实现一个简单的动作服务器。动作服务器用于接收和处理来自客户端的动作请求,并提供反馈给客户端。

首先,我们需要导入必要的模块和类:

import rospy
import actionlib
from my_pkg.msg import MyAction

在上面的代码中,rospyactionlib是Python库中的ROS模块,MyAction是自定义消息类型。

下一步,我们需要实例化一个动作服务器,并为其指定动作类型、回调函数和服务器名称:

def action_callback(goal):
    # 在这里实现动作的具体逻辑
    result = MyAction.Result()
    server.set_succeeded(result)

rospy.init_node('action_server')
server = actionlib.SimpleActionServer('my_action', MyAction, action_callback, False)
server.start()

在上面的代码中,我们定义了一个名为action_callback的回调函数,用于处理来自客户端的动作请求。在此回调函数中,我们可以实现动作的具体逻辑。当动作处理完成后,我们需要调用server.set_succeeded(result)来向客户端发送动作执行完成的反馈。

然后,我们使用rospy.init_node('action_server')初始化ROS节点,并使用actionlib.SimpleActionServer类创建一个名为my_action的动作服务器。我们传递了动作类型MyAction、回调函数action_callback和参数False给构造函数。参数False表示我们将手动调用server.start()来启动服务器。

最后,我们需要让动作服务器保持活动状态:

rospy.spin()

使用rospy.spin()函数将会让程序保持运行状态,直到节点被关闭。

下面是一个完整的示例,展示了如何实现一个简单的动作服务器和客户端。

#!/usr/bin/env python

import rospy
import actionlib
from my_pkg.msg import MyAction

def action_callback(goal):
    for i in range(goal.count):
        rospy.loginfo("Executing action step %d/%d", i+1, goal.count)
        rospy.sleep(1.0)
        if server.is_preempt_requested():
            rospy.loginfo("Action preempted")
            server.set_preempted()
            return

    result = MyAction.Result()
    server.set_succeeded(result)
    rospy.loginfo("Action completed successfully")

rospy.init_node('action_server')
server = actionlib.SimpleActionServer('my_action', MyAction, action_callback, False)
server.start()

rospy.spin()

在上面的示例中,我们实现了一个名为my_action的动作服务器。每秒执行一次动作步骤,共执行goal.count个动作步骤。如果在执行过程中接收到了预empt请求,则中止动作执行。

需要注意的是,在回调函数中执行的任务应该是非阻塞的。当我们调用rospy.sleep()函数时,ROS会继续处理其他事件,包括来自客户端的请求。这样,动作服务器可以及时响应并处理预empt请求。

接下来,让我们编写一个简单的动作客户端来测试我们的动作服务器。创建一个名为action_client.py的文件,写入以下代码:

#!/usr/bin/env python

import rospy
import actionlib
from my_pkg.msg import MyAction

rospy.init_node('action_client')
client = actionlib.SimpleActionClient('my_action', MyAction)
client.wait_for_server()

goal = MyAction.Goal()
goal.count = 5

client.send_goal(goal)
client.wait_for_result()

result = client.get_result()
rospy.loginfo("Action result: %s", result)

在上面的代码中,我们首先初始化ROS节点并创建一个动作客户端。然后,我们调用wait_for_server()函数等待动作服务器可用。

接下来,我们创建一个MyAction.Goal对象,并设置要执行的动作步骤数量为5。

然后,我们使用send_goal()函数发送动作目标,然后等待动作执行完成。我们使用get_result()函数获取动作执行的结果,并通过日志打印出来。

现在,我们可以在终端上打开两个不同的终端窗口,分别运行动作服务器和动作客户端的脚本。在动作服务器的终端窗口中,我们可以看到动作的执行过程信息。在动作客户端的终端窗口中,我们可以看到动作的执行结果。

这就是使用SimpleActionServer类实现一个简单的动作服务器的方法。通过动作服务器,我们可以方便地处理来自客户端的动作请求,并提供相应的反馈。