使用Python编写自定义的SimpleActionServer()实例
SimpleActionServer是一个ROS封装库,用于创建和管理一个简单的Action服务器。一个Action服务器是一个ROS节点,它接收来自Action客户端发送的目标,并根据目标执行一些任务,然后向客户端发送结果。
下面是一个使用Python编写的自定义SimpleActionServer实例的示例代码:
import rospy
import actionlib
from my_custom_msgs.msg import MyAction, MyActionFeedback, MyActionResult
class MyActionServer(object):
# 初始化Action服务器
def __init__(self):
# 创建一个Action服务器对象,指定消息类型为MyAction
self.server = actionlib.SimpleActionServer('my_action', MyAction, self.execute, False)
self.server.start()
# 执行任务的回调函数
def execute(self, goal):
# 创建一个反馈对象
feedback = MyActionFeedback()
# 创建一个结果对象
result = MyActionResult()
# 执行任务逻辑,这里以延时2秒为例
rospy.sleep(2)
if self.server.is_preempt_requested():
# 如果接收到了取消请求,停止当前任务并发送一个结果
self.server.set_preempted(result)
else:
# 如果任务顺利完成,发送一个结果
self.server.set_succeeded(result)
if __name__ == '__main__':
# 初始化ROS节点
rospy.init_node('my_action_server_node')
# 创建自定义Action服务器对象
action_server = MyActionServer()
# 进入循环,等待回调函数执行
rospy.spin()
在这个例子中,我们首先导入了必要的ROS和actionlib库,然后导入了自定义的消息类型MyAction,MyActionFeedback和MyActionResult。
接下来,我们创建了一个名为MyActionServer的类,它继承自object类,并包含了初始化函数__init__()和任务执行回调函数execute()。
在初始化函数__init__()中,我们创建了一个Action服务器对象self.server,并指定消息类型为MyAction。然后,我们调用self.server.start()来启动Action服务器。
在任务执行回调函数execute()中,我们首先创建了一个反馈对象feedback和一个结果对象result。
然后,我们执行了一个任务逻辑,这里只是使用rospy.sleep()来模拟一个延时2秒的任务。
接下来,我们使用self.server.is_preempt_requested()来检查是否接收到了取消请求。如果接收到了取消请求,我们调用self.server.set_preempted()来停止当前任务并发送一个结果;如果没有接收到取消请求,我们调用self.server.set_succeeded()方法来发送一个成功的结果。
在主函数中,我们初始化了ROS节点,创建了一个名为my_action_server_node的节点,并实例化了MyActionServer对象。然后,我们进入一个循环,等待回调函数的执行。
这是一个简单的自定义SimpleActionServer实例的示例。你可以根据你的具体需求修改任务逻辑和自定义消息类型,并使用rospy.spin()来等待回调函数的执行。
