欢迎访问宙启技术站
智能推送

Python中简单实现actionlibSimpleActionServer()的步骤

发布时间:2023-12-31 17:35:13

在Python中使用actionlib库实现SimpleActionServer的步骤如下:

1. 导入所需的库和消息类型。

import rospy
import actionlib
from your_package_name.msg import YourActionNameAction, YourActionNameFeedback, YourActionNameResult

2. 创建一个名为SimpleActionServer的类,并继承自actionlib.SimpleActionServer。

class SimpleActionServer(actionlib.SimpleActionServer):
    def __init__(self, name):
        actionlib.SimpleActionServer.__init__(self, name, YourActionNameAction, self.execute, False)
        self.result = YourActionNameResult()
        self.feedback = YourActionNameFeedback()

3. 实现execute方法,用于处理收到的goal。

    def execute(self, goal):
        # 在处理goal之前,先判断服务器是否已经被终止
        if self.is_preempt_requested():
            self.set_preempted()
            return
        
        # 执行一些操作,并发送feedback
        for i in range(1, goal.target_number + 1):
            self.feedback.current_number = i
            self.publish_feedback(self.feedback)
            rospy.sleep(1)
            
        # 执行完操作后,设置结果并发送
        self.result.final_number = goal.target_number
        self.set_succeeded(self.result)

4. 在主函数中实例化SimpleActionServer,并调用spin()函数。

if __name__ == "__main__":
    rospy.init_node("simple_action_server")
    server = SimpleActionServer("your_action_name")
    server.start()
    rospy.spin()

下面是一个完整的例子,演示如何使用actionlib库实现一个简单的计数器功能。

import rospy
import actionlib
from your_package_name.msg import CounterAction, CounterFeedback, CounterResult

class CounterServer(actionlib.SimpleActionServer):
    def __init__(self, name):
        actionlib.SimpleActionServer.__init__(self, name, CounterAction, self.execute, False)
        self.result = CounterResult()
        self.feedback = CounterFeedback()

    def execute(self, goal):
        if self.is_preempt_requested():
            self.set_preempted()
            return
        
        for i in range(1, goal.target_number + 1):
            self.feedback.current_number = i
            self.publish_feedback(self.feedback)
            rospy.sleep(1)
            
        self.result.final_number = goal.target_number
        self.set_succeeded(self.result)

if __name__ == "__main__":
    rospy.init_node("counter_server")
    server = CounterServer("counter")
    server.start()
    rospy.spin()

你需要替换 your_package_nameyour_action_name 为你自己的包名和action名。

保存以上代码到一个名为 counter_server.py 的文件中,在其所在目录下执行以下命令以使脚本可执行:

chmod +x counter_server.py

然后通过以下命令启动action服务器:

rosrun your_package_name counter_server.py

接下来,你可以通过在另一个终端中发布一个goal来测试服务器。在此之前,确保你已经定义了一个适当的action消息和在launch文件中为action服务器创建了一个launch文件。

<launch>
    <node pkg="your_package_name" type="action_client.py" name="counter_client" output="screen"/>
</launch>

保存以上代码到一个名为 action_client.launch 的文件中,并执行以下命令启动launch文件:

roslaunch your_package_name action_client.launch

在另一个终端中,通过以下命令发布一个goal:

rostopic pub -1 /counter counter_msgs/CounterAction "goal: 10"

你应该会在两个终端中看到计数器的输出。