Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
example_route.py
1 #! /usr/bin/env python3
2 # Copyright 2025 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 from geometry_msgs.msg import Pose, PoseStamped
17 from nav2_simple_commander.robot_navigator import BasicNavigator, RunningTask, TaskResult
18 from nav2_simple_commander.utils import euler_to_quaternion
19 import rclpy
20 from std_msgs.msg import Header
21 
22 """
23 Basic navigation demo to using the route server.
24 """
25 
26 
27 def toPoseStamped(pt: Pose, header: Header) -> PoseStamped:
28  pose = PoseStamped()
29  pose.pose.position.x = pt.x
30  pose.pose.position.y = pt.y
31  pose.header = header
32  return pose
33 
34 
35 def main() -> None:
36  rclpy.init()
37 
38  node = rclpy.create_node('route_example')
39 
40  node.declare_parameter('start_pose.x', 0.0)
41  node.declare_parameter('start_pose.y', 0.0)
42  node.declare_parameter('start_pose.yaw', 0.0)
43 
44  node.declare_parameter('goal_pose.x', 0.0)
45  node.declare_parameter('goal_pose.y', 0.0)
46  node.declare_parameter('goal_pose.yaw', 0.0)
47 
48  start_x = node.get_parameter('start_pose.x').value
49  start_y = node.get_parameter('start_pose.y').value
50  start_yaw = node.get_parameter('start_pose.yaw').value
51 
52  goal_x = node.get_parameter('goal_pose.x').value
53  goal_y = node.get_parameter('goal_pose.y').value
54  goal_yaw = node.get_parameter('goal_pose.yaw').value
55 
56  node.destroy_node()
57 
58  navigator = BasicNavigator()
59 
60  # Set our demo's initial pose
61  initial_pose = PoseStamped()
62  initial_pose.header.frame_id = 'map'
63  initial_pose.header.stamp = navigator.get_clock().now().to_msg()
64  initial_pose.pose.position.x = start_x
65  initial_pose.pose.position.y = start_y
66  initial_pose.pose.orientation = euler_to_quaternion(0.0, 0.0, start_yaw)
67  navigator.setInitialPose(initial_pose)
68 
69  # Activate navigation, if not autostarted. This should be called after setInitialPose()
70  # or this will initialize at the origin of the map and update the costmap with bogus readings.
71  # If autostart, you should `waitUntilNav2Active()` instead.
72  # navigator.lifecycleStartup()
73 
74  # Wait for navigation to fully activate, since autostarting nav2
75  navigator.waitUntilNav2Active()
76 
77  # If desired, you can change or load the map as well
78  # navigator.changeMap('/path/to/map.yaml')
79 
80  # You may use the navigator to clear or obtain costmaps
81  # navigator.clearAllCostmaps() # also have clearLocalCostmap() and clearGlobalCostmap()
82  # global_costmap = navigator.getGlobalCostmap()
83  # local_costmap = navigator.getLocalCostmap()
84 
85  # Go to our demos first goal pose
86  goal_pose = PoseStamped()
87  goal_pose.header.frame_id = 'map'
88  goal_pose.header.stamp = navigator.get_clock().now().to_msg()
89  goal_pose.pose.position.x = goal_x
90  goal_pose.pose.position.y = goal_y
91  goal_pose.pose.orientation = euler_to_quaternion(0.0, 0.0, goal_yaw)
92 
93  # Sanity check a valid route exists using PoseStamped.
94  # May also use NodeIDs on the graph if they are known by passing them instead as `int`
95  # [path, route] = navigator.getRoute(initial_pose, goal_pose)
96 
97  # May also use NodeIDs on the graph if they are known by passing them instead as `int`
98  route_tracking_task = navigator.getAndTrackRoute(initial_pose, goal_pose)
99 
100  # Note for the route server, we have a special route argument in the API b/c it may be
101  # providing feedback messages simultaneously to others (e.g. controller or WPF as below)
102  task_canceled = False
103  last_feedback = None
104  follow_path_task = RunningTask.NONE
105  while not navigator.isTaskComplete(task=route_tracking_task):
106 
111 
112  # Do something with the feedback, which contains the route / path if tracking
113  feedback = navigator.getFeedback(task=route_tracking_task)
114  while feedback is not None:
115  if not last_feedback or \
116  (feedback.last_node_id != last_feedback.last_node_id or
117  feedback.next_node_id != last_feedback.next_node_id):
118  print('Passed node ' + str(feedback.last_node_id) +
119  ' to next node ' + str(feedback.next_node_id) +
120  ' along edge ' + str(feedback.current_edge_id) + '.')
121 
122  last_feedback = feedback
123 
124  if feedback.rerouted: # or follow_path_task == RunningTask.None
125  # Follow the path from the route server using the controller server
126  print('Passing new route to controller!')
127  follow_path_task = navigator.followPath(feedback.path)
128 
129  # May instead use the waypoint follower
130  # (or nav through poses) and use the route's sparse nodes!
131  # print("Passing route to waypoint follower!")
132  # nodes =
133  # [toPoseStamped(x.position, feedback.route.header) for x in feedback.route.nodes]
134  # navigator.followWaypoints(nodes)
135  # Or navigator.navigateThroughPoses(nodes)
136  # Consider sending only the first few and iterating
137 
138  feedback = navigator.getFeedback(task=route_tracking_task)
139 
140  # Check if followPath or WPF task is done (or failed),
141  # will cancel all current tasks, including route
142  if navigator.isTaskComplete(task=follow_path_task):
143  print('Controller or waypoint follower server completed its task!')
144  navigator.cancelTask()
145  task_canceled = True
146 
147  # Route server will return completed status before the controller / WPF server
148  # so wait for the actual robot task processing server to complete
149  while not navigator.isTaskComplete(task=follow_path_task) and not task_canceled:
150  pass
151 
152  # Do something depending on the return code
153  result = navigator.getResult()
154  if result == TaskResult.SUCCEEDED:
155  print('Goal succeeded!')
156  elif result == TaskResult.CANCELED:
157  print('Goal was canceled!')
158  elif result == TaskResult.FAILED:
159  print('Goal failed!')
160  else:
161  print('Goal has an invalid return status!')
162 
163  navigator.lifecycleShutdown()
164 
165  exit(0)
166 
167 
168 if __name__ == '__main__':
169  main()