Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
robot_navigator.py
1 #! /usr/bin/env python3
2 # Copyright 2021 Samsung Research America
3 # Copyright 2025 Open Navigation LLC
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 from enum import Enum
19 import time
20 from typing import Any, Optional, Union
21 
22 from action_msgs.msg import GoalStatus
23 from builtin_interfaces.msg import Duration
24 from geographic_msgs.msg import GeoPose
25 from geometry_msgs.msg import Point, PoseStamped, PoseWithCovarianceStamped
26 from lifecycle_msgs.srv import GetState
27 from nav2_msgs.action import (AssistedTeleop, BackUp, ComputeAndTrackRoute,
28  ComputePathThroughPoses, ComputePathToPose, ComputeRoute, DockRobot,
29  DriveOnHeading, FollowGPSWaypoints, FollowPath, FollowWaypoints,
30  NavigateThroughPoses, NavigateToPose, SmoothPath, Spin, UndockRobot)
31 from nav2_msgs.msg import Route
32 from nav2_msgs.srv import (ClearCostmapAroundPose, ClearCostmapAroundRobot,
33  ClearCostmapExceptRegion, ClearEntireCostmap, GetCostmap, LoadMap,
34  ManageLifecycleNodes)
35 from nav_msgs.msg import Goals, OccupancyGrid, Path
36 import rclpy
37 from rclpy.action import ActionClient
38 from rclpy.action.client import ClientGoalHandle
39 from rclpy.client import Client
40 from rclpy.duration import Duration as rclpyDuration
41 from rclpy.node import Node
42 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
43 from rclpy.task import Future
44 from rclpy.type_support import GetResultServiceResponse
45 
46 
47 # Task Result enum for the result of the task being executed
48 class TaskResult(Enum):
49  UNKNOWN = 0
50  SUCCEEDED = 1
51  CANCELED = 2
52  FAILED = 3
53 
54 
55 # Task enum for the task being executed, if its a long-running task to be able to obtain
56 # necessary contextual information in `isTaskComplete` and `getFeedback` regarding the task
57 # which is running.
58 class RunningTask(Enum):
59  NONE = 0
60  NAVIGATE_TO_POSE = 1
61  NAVIGATE_THROUGH_POSES = 2
62  FOLLOW_PATH = 3
63  FOLLOW_WAYPOINTS = 4
64  FOLLOW_GPS_WAYPOINTS = 5
65  SPIN = 6
66  BACKUP = 7
67  DRIVE_ON_HEADING = 8
68  ASSISTED_TELEOP = 9
69  DOCK_ROBOT = 10
70  UNDOCK_ROBOT = 11
71  COMPUTE_AND_TRACK_ROUTE = 12
72 
73 
74 class BasicNavigator(Node):
75 
76  def __init__(self, node_name: str = 'basic_navigator', namespace: str = '') -> None:
77  super().__init__(node_name=node_name, namespace=namespace)
78  self.initial_poseinitial_pose = PoseStamped()
79  self.initial_poseinitial_pose.header.frame_id = 'map'
80 
81  self.goal_handlegoal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None
82  self.result_futureresult_future: \
83  Optional[Future[GetResultServiceResponse[Any]]] = None
84  self.feedbackfeedback: Any = None
85  self.statusstatus: Optional[int] = None
86 
87  # Since the route server's compute and track action server is likely
88  # to be running simultaneously with another (e.g. controller, WPF) server,
89  # we must track its futures and feedback separately. Additionally, the
90  # route tracking feedback is uniquely important to be complete and ordered
91  self.route_goal_handleroute_goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None
92  self.route_result_futureroute_result_future: \
93  Optional[Future[GetResultServiceResponse[Any]]] = None
94  self.route_feedback: list[Any] = []
95 
96  # Error code and messages from servers
97  self.last_action_error_codelast_action_error_code = 0
98  self.last_action_error_msglast_action_error_msg = ''
99 
100  amcl_pose_qos = QoSProfile(
101  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
102  reliability=QoSReliabilityPolicy.RELIABLE,
103  history=QoSHistoryPolicy.KEEP_LAST,
104  depth=1,
105  )
106 
107  self.initial_pose_receivedinitial_pose_received = False
108  self.nav_through_poses_client: ActionClient[
109  NavigateThroughPoses.Goal,
110  NavigateThroughPoses.Result,
111  NavigateThroughPoses.Feedback
112  ] = ActionClient(
113  self, NavigateThroughPoses, 'navigate_through_poses')
114  self.nav_to_pose_client: ActionClient[
115  NavigateToPose.Goal,
116  NavigateToPose.Result,
117  NavigateToPose.Feedback
118  ] = ActionClient(self, NavigateToPose, 'navigate_to_pose')
119  self.follow_waypoints_client: ActionClient[
120  FollowWaypoints.Goal,
121  FollowWaypoints.Result,
122  FollowWaypoints.Feedback
123  ] = ActionClient(
124  self, FollowWaypoints, 'follow_waypoints'
125  )
126  self.follow_gps_waypoints_client: ActionClient[
127  FollowGPSWaypoints.Goal,
128  FollowGPSWaypoints.Result,
129  FollowGPSWaypoints.Feedback
130  ] = ActionClient(
131  self, FollowGPSWaypoints, 'follow_gps_waypoints'
132  )
133  self.follow_path_client: ActionClient[
134  FollowPath.Goal,
135  FollowPath.Result,
136  FollowPath.Feedback
137  ] = ActionClient(self, FollowPath, 'follow_path')
138  self.compute_path_to_pose_client: ActionClient[
139  ComputePathToPose.Goal,
140  ComputePathToPose.Result,
141  ComputePathToPose.Feedback
142  ] = ActionClient(
143  self, ComputePathToPose, 'compute_path_to_pose'
144  )
145  self.compute_path_through_poses_client: ActionClient[
146  ComputePathThroughPoses.Goal,
147  ComputePathThroughPoses.Result,
148  ComputePathThroughPoses.Feedback
149  ] = ActionClient(
150  self, ComputePathThroughPoses, 'compute_path_through_poses'
151  )
152  self.smoother_client: ActionClient[
153  SmoothPath.Goal,
154  SmoothPath.Result,
155  SmoothPath.Feedback
156  ] = ActionClient(self, SmoothPath, 'smooth_path')
157  self.compute_route_client: ActionClient[
158  ComputeRoute.Goal,
159  ComputeRoute.Result,
160  ComputeRoute.Feedback
161  ] = ActionClient(self, ComputeRoute, 'compute_route')
162  self.compute_and_track_route_client: ActionClient[
163  ComputeAndTrackRoute.Goal,
164  ComputeAndTrackRoute.Result,
165  ComputeAndTrackRoute.Feedback
166  ] = ActionClient(self, ComputeAndTrackRoute, 'compute_and_track_route')
167  self.spin_client: ActionClient[
168  Spin.Goal,
169  Spin.Result,
170  Spin.Feedback
171  ] = ActionClient(self, Spin, 'spin')
172 
173  self.backup_client: ActionClient[
174  BackUp.Goal,
175  BackUp.Result,
176  BackUp.Feedback
177  ] = ActionClient(self, BackUp, 'backup')
178  self.drive_on_heading_client: ActionClient[
179  DriveOnHeading.Goal,
180  DriveOnHeading.Result,
181  DriveOnHeading.Feedback
182  ] = ActionClient(
183  self, DriveOnHeading, 'drive_on_heading'
184  )
185  self.assisted_teleop_client: ActionClient[
186  AssistedTeleop.Goal,
187  AssistedTeleop.Result,
188  AssistedTeleop.Feedback
189  ] = ActionClient(
190  self, AssistedTeleop, 'assisted_teleop'
191  )
192  self.docking_client: ActionClient[
193  DockRobot.Goal,
194  DockRobot.Result,
195  DockRobot.Feedback
196  ] = ActionClient(self, DockRobot, 'dock_robot')
197  self.undocking_client: ActionClient[
198  UndockRobot.Goal,
199  UndockRobot.Result,
200  UndockRobot.Feedback
201  ] = ActionClient(self, UndockRobot, 'undock_robot')
202 
203  self.localization_pose_sublocalization_pose_sub = self.create_subscription(
204  PoseWithCovarianceStamped,
205  'amcl_pose',
206  self._amclPoseCallback_amclPoseCallback,
207  amcl_pose_qos,
208  )
209  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
210  PoseWithCovarianceStamped, 'initialpose', 10
211  )
212  self.change_maps_srv: Client[LoadMap.Request, LoadMap.Response] = \
213  self.create_client(LoadMap, 'map_server/load_map')
214  self.clear_costmap_global_srv: Client[
215  ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
216  self.create_client(
217  ClearEntireCostmap, 'global_costmap/clear_entirely_global_costmap'
218  )
219  self.clear_costmap_local_srv: Client[
220  ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
221  self.create_client(
222  ClearEntireCostmap, 'local_costmap/clear_entirely_local_costmap'
223  )
224  self.clear_costmap_except_region_srv: Client[
225  ClearCostmapExceptRegion.Request, ClearCostmapExceptRegion.Response] = \
226  self.create_client(
227  ClearCostmapExceptRegion, 'local_costmap/clear_costmap_except_region'
228  )
229  self.clear_costmap_around_robot_srv: Client[
230  ClearCostmapAroundRobot.Request, ClearCostmapAroundRobot.Response] = \
231  self.create_client(
232  ClearCostmapAroundRobot, 'local_costmap/clear_costmap_around_robot'
233  )
234  self.clear_local_costmap_around_pose_srv: Client[
235  ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
236  self.create_client(
237  ClearCostmapAroundPose, 'local_costmap/clear_costmap_around_pose'
238  )
239  self.clear_global_costmap_around_pose_srv: Client[
240  ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
241  self.create_client(
242  ClearCostmapAroundPose, 'global_costmap/clear_costmap_around_pose'
243  )
244  self.get_costmap_global_srv: Client[
245  GetCostmap.Request, GetCostmap.Response] = \
246  self.create_client(
247  GetCostmap, 'global_costmap/get_costmap'
248  )
249  self.get_costmap_local_srv: Client[
250  GetCostmap.Request, GetCostmap.Response] = \
251  self.create_client(
252  GetCostmap, 'local_costmap/get_costmap'
253  )
254 
255  def destroyNode(self) -> None:
256  self.destroy_nodedestroy_node()
257 
258  def destroy_node(self) -> None:
259  self.nav_through_poses_client.destroy()
260  self.nav_to_pose_client.destroy()
261  self.follow_waypoints_client.destroy()
262  self.follow_path_client.destroy()
263  self.compute_path_to_pose_client.destroy()
264  self.compute_path_through_poses_client.destroy()
265  self.compute_and_track_route_client.destroy()
266  self.compute_route_client.destroy()
267  self.smoother_client.destroy()
268  self.spin_client.destroy()
269  self.backup_client.destroy()
270  self.drive_on_heading_client.destroy()
271  self.assisted_teleop_client.destroy()
272  self.follow_gps_waypoints_client.destroy()
273  self.docking_client.destroy()
274  self.undocking_client.destroy()
275  super().destroy_node()
276 
277  def setInitialPose(self, initial_pose: PoseStamped) -> None:
278  """Set the initial pose to the localization system."""
279  self.initial_pose_receivedinitial_pose_received = False
280  self.initial_poseinitial_pose = initial_pose
281  self._setInitialPose_setInitialPose()
282 
283  def goThroughPoses(self, poses: Goals, behavior_tree: str = '') -> Optional[RunningTask]:
284  """Send a `NavThroughPoses` action request."""
285  self.clearTaskErrorclearTaskError()
286  self.debugdebug("Waiting for 'NavigateThroughPoses' action server")
287  while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0):
288  self.infoinfo("'NavigateThroughPoses' action server not available, waiting...")
289 
290  goal_msg = NavigateThroughPoses.Goal()
291  goal_msg.poses = poses
292  goal_msg.behavior_tree = behavior_tree
293 
294  self.infoinfo(f'Navigating with {len(goal_msg.poses)} goals....')
295  send_goal_future = self.nav_through_poses_client.send_goal_async(
296  goal_msg, self._feedbackCallback_feedbackCallback
297  )
298  rclpy.spin_until_future_complete(self, send_goal_future)
299  self.goal_handlegoal_handle = send_goal_future.result()
300 
301  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
302  msg = f'NavigateThroughPoses request with {len(poses)} was rejected!'
303  self.setTaskErrorsetTaskError(NavigateThroughPoses.UNKNOWN, msg)
304  self.errorerror(msg)
305  return None
306 
307  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
308  return RunningTask.NAVIGATE_THROUGH_POSES
309 
310  def goToPose(self, pose: PoseStamped, behavior_tree: str = '') -> Optional[RunningTask]:
311  """Send a `NavToPose` action request."""
312  self.clearTaskErrorclearTaskError()
313  self.debugdebug("Waiting for 'NavigateToPose' action server")
314  while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0):
315  self.infoinfo("'NavigateToPose' action server not available, waiting...")
316 
317  goal_msg = NavigateToPose.Goal()
318  goal_msg.pose = pose
319  goal_msg.behavior_tree = behavior_tree
320 
321  self.infoinfo(
322  'Navigating to goal: '
323  + str(pose.pose.position.x)
324  + ' '
325  + str(pose.pose.position.y)
326  + '...'
327  )
328  send_goal_future = self.nav_to_pose_client.send_goal_async(
329  goal_msg, self._feedbackCallback_feedbackCallback
330  )
331  rclpy.spin_until_future_complete(self, send_goal_future)
332  self.goal_handlegoal_handle = send_goal_future.result()
333 
334  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
335  msg = (
336  'NavigateToPose goal to '
337  + str(pose.pose.position.x)
338  + ' '
339  + str(pose.pose.position.y)
340  + ' was rejected!'
341  )
342  self.setTaskErrorsetTaskError(NavigateToPose.UNKNOWN, msg)
343  self.errorerror(msg)
344  return None
345 
346  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
347  return RunningTask.NAVIGATE_TO_POSE
348 
349  def followWaypoints(self, poses: list[PoseStamped]) -> Optional[RunningTask]:
350  """Send a `FollowWaypoints` action request."""
351  self.clearTaskErrorclearTaskError()
352  self.debugdebug("Waiting for 'FollowWaypoints' action server")
353  while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0):
354  self.infoinfo("'FollowWaypoints' action server not available, waiting...")
355 
356  goal_msg = FollowWaypoints.Goal()
357  goal_msg.poses = poses
358 
359  self.infoinfo(f'Following {len(goal_msg.poses)} goals....')
360  send_goal_future = self.follow_waypoints_client.send_goal_async(
361  goal_msg, self._feedbackCallback_feedbackCallback
362  )
363  rclpy.spin_until_future_complete(self, send_goal_future)
364  self.goal_handlegoal_handle = send_goal_future.result()
365 
366  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
367  msg = f'Following {len(poses)} waypoints request was rejected!'
368  self.setTaskErrorsetTaskError(FollowWaypoints.UNKNOWN, msg)
369  self.errorerror(msg)
370  return None
371 
372  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
373  return RunningTask.FOLLOW_WAYPOINTS
374 
375  def followGpsWaypoints(self, gps_poses: list[GeoPose]) -> Optional[RunningTask]:
376  """Send a `FollowGPSWaypoints` action request."""
377  self.clearTaskErrorclearTaskError()
378  self.debugdebug("Waiting for 'FollowWaypoints' action server")
379  while not self.follow_gps_waypoints_client.wait_for_server(timeout_sec=1.0):
380  self.infoinfo("'FollowWaypoints' action server not available, waiting...")
381 
382  goal_msg = FollowGPSWaypoints.Goal()
383  goal_msg.gps_poses = gps_poses
384 
385  self.infoinfo(f'Following {len(goal_msg.gps_poses)} gps goals....')
386  send_goal_future = self.follow_gps_waypoints_client.send_goal_async(
387  goal_msg, self._feedbackCallback_feedbackCallback
388  )
389  rclpy.spin_until_future_complete(self, send_goal_future)
390  self.goal_handlegoal_handle = send_goal_future.result()
391 
392  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
393  msg = f'Following {len(gps_poses)} gps waypoints request was rejected!'
394  self.setTaskErrorsetTaskError(FollowGPSWaypoints.UNKNOWN, msg)
395  self.errorerror(msg)
396  return None
397 
398  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
399  return RunningTask.FOLLOW_GPS_WAYPOINTS
400 
401  def spin(
402  self, spin_dist: float = 1.57, time_allowance: int = 10,
403  disable_collision_checks: bool = False) -> Optional[RunningTask]:
404  self.clearTaskErrorclearTaskError()
405  self.debugdebug("Waiting for 'Spin' action server")
406  while not self.spin_client.wait_for_server(timeout_sec=1.0):
407  self.infoinfo("'Spin' action server not available, waiting...")
408  goal_msg = Spin.Goal()
409  goal_msg.target_yaw = spin_dist
410  goal_msg.time_allowance = Duration(sec=time_allowance)
411  goal_msg.disable_collision_checks = disable_collision_checks
412 
413  self.infoinfo(f'Spinning to angle {goal_msg.target_yaw}....')
414  send_goal_future = self.spin_client.send_goal_async(
415  goal_msg, self._feedbackCallback_feedbackCallback
416  )
417  rclpy.spin_until_future_complete(self, send_goal_future)
418  self.goal_handlegoal_handle = send_goal_future.result()
419 
420  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
421  msg = 'Spin request was rejected!'
422  self.setTaskErrorsetTaskError(Spin.UNKNOWN, msg)
423  self.errorerror(msg)
424  return None
425 
426  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
427  return RunningTask.SPIN
428 
429  def backup(
430  self, backup_dist: float = 0.15, backup_speed: float = 0.025,
431  time_allowance: int = 10,
432  disable_collision_checks: bool = False) -> Optional[RunningTask]:
433  self.clearTaskErrorclearTaskError()
434  self.debugdebug("Waiting for 'Backup' action server")
435  while not self.backup_client.wait_for_server(timeout_sec=1.0):
436  self.infoinfo("'Backup' action server not available, waiting...")
437  goal_msg = BackUp.Goal()
438  goal_msg.target = Point(x=float(backup_dist))
439  goal_msg.speed = backup_speed
440  goal_msg.time_allowance = Duration(sec=time_allowance)
441  goal_msg.disable_collision_checks = disable_collision_checks
442 
443  self.infoinfo(f'Backing up {goal_msg.target.x} m at {goal_msg.speed} m/s....')
444  send_goal_future = self.backup_client.send_goal_async(
445  goal_msg, self._feedbackCallback_feedbackCallback
446  )
447  rclpy.spin_until_future_complete(self, send_goal_future)
448  self.goal_handlegoal_handle = send_goal_future.result()
449 
450  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
451  msg = 'Backup request was rejected!'
452  self.setTaskErrorsetTaskError(BackUp.UNKNOWN, msg)
453  self.errorerror(msg)
454  return None
455 
456  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
457  return RunningTask.BACKUP
458 
459  def driveOnHeading(
460  self, dist: float = 0.15, speed: float = 0.025,
461  time_allowance: int = 10,
462  disable_collision_checks: bool = False) -> Optional[RunningTask]:
463  self.clearTaskErrorclearTaskError()
464  self.debugdebug("Waiting for 'DriveOnHeading' action server")
465  while not self.backup_client.wait_for_server(timeout_sec=1.0):
466  self.infoinfo("'DriveOnHeading' action server not available, waiting...")
467  goal_msg = DriveOnHeading.Goal()
468  goal_msg.target = Point(x=float(dist))
469  goal_msg.speed = speed
470  goal_msg.time_allowance = Duration(sec=time_allowance)
471  goal_msg.disable_collision_checks = disable_collision_checks
472 
473  self.infoinfo(f'Drive {goal_msg.target.x} m on heading at {goal_msg.speed} m/s....')
474  send_goal_future = self.drive_on_heading_client.send_goal_async(
475  goal_msg, self._feedbackCallback_feedbackCallback
476  )
477  rclpy.spin_until_future_complete(self, send_goal_future)
478  self.goal_handlegoal_handle = send_goal_future.result()
479 
480  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
481  msg = 'Drive On Heading request was rejected!'
482  self.setTaskErrorsetTaskError(DriveOnHeading.UNKNOWN, msg)
483  self.errorerror(msg)
484  return None
485 
486  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
487  return RunningTask.DRIVE_ON_HEADING
488 
489  def assistedTeleop(self, time_allowance: int = 30) -> Optional[RunningTask]:
490 
491  self.clearTaskErrorclearTaskError()
492  self.debugdebug("Wanting for 'assisted_teleop' action server")
493 
494  while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0):
495  self.infoinfo("'assisted_teleop' action server not available, waiting...")
496  goal_msg = AssistedTeleop.Goal()
497  goal_msg.time_allowance = Duration(sec=time_allowance)
498 
499  self.infoinfo("Running 'assisted_teleop'....")
500  send_goal_future = self.assisted_teleop_client.send_goal_async(
501  goal_msg, self._feedbackCallback_feedbackCallback
502  )
503  rclpy.spin_until_future_complete(self, send_goal_future)
504  self.goal_handlegoal_handle = send_goal_future.result()
505 
506  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
507  msg = 'Assisted Teleop request was rejected!'
508  self.setTaskErrorsetTaskError(AssistedTeleop.UNKNOWN, msg)
509  self.errorerror(msg)
510  return None
511 
512  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
513  return RunningTask.ASSISTED_TELEOP
514 
515  def followPath(self, path: Path, controller_id: str = '',
516  goal_checker_id: str = '') -> Optional[RunningTask]:
517  self.clearTaskErrorclearTaskError()
518  """Send a `FollowPath` action request."""
519  self.debugdebug("Waiting for 'FollowPath' action server")
520  while not self.follow_path_client.wait_for_server(timeout_sec=1.0):
521  self.infoinfo("'FollowPath' action server not available, waiting...")
522 
523  goal_msg = FollowPath.Goal()
524  goal_msg.path = path
525  goal_msg.controller_id = controller_id
526  goal_msg.goal_checker_id = goal_checker_id
527 
528  self.infoinfo('Executing path...')
529  send_goal_future = self.follow_path_client.send_goal_async(
530  goal_msg, self._feedbackCallback_feedbackCallback
531  )
532  rclpy.spin_until_future_complete(self, send_goal_future)
533  self.goal_handlegoal_handle = send_goal_future.result()
534 
535  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
536  msg = 'FollowPath goal was rejected!'
537  self.setTaskErrorsetTaskError(FollowPath.UNKNOWN, msg)
538  self.errorerror(msg)
539  return None
540 
541  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
542  return RunningTask.FOLLOW_PATH
543 
544  def dockRobotByPose(self, dock_pose: PoseStamped,
545  dock_type: str = '', nav_to_dock: bool = True) -> Optional[RunningTask]:
546  self.clearTaskErrorclearTaskError()
547  """Send a `DockRobot` action request."""
548  self.infoinfo("Waiting for 'DockRobot' action server")
549  while not self.docking_client.wait_for_server(timeout_sec=1.0):
550  self.infoinfo('"DockRobot" action server not available, waiting...')
551 
552  goal_msg = DockRobot.Goal()
553  goal_msg.use_dock_id = False
554  goal_msg.dock_pose = dock_pose
555  goal_msg.dock_type = dock_type
556  goal_msg.navigate_to_staging_pose = nav_to_dock # if want to navigate before staging
557 
558  self.infoinfo('Docking at pose: ' + str(dock_pose) + '...')
559  send_goal_future = self.docking_client.send_goal_async(goal_msg,
560  self._feedbackCallback_feedbackCallback)
561  rclpy.spin_until_future_complete(self, send_goal_future)
562  self.goal_handlegoal_handle = send_goal_future.result()
563 
564  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
565  msg = 'DockRobot request was rejected!'
566  self.setTaskErrorsetTaskError(DockRobot.UNKNOWN, msg)
567  self.errorerror(msg)
568  return None
569 
570  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
571  return RunningTask.DOCK_ROBOT
572 
573  def dockRobotByID(self, dock_id: str, nav_to_dock: bool = True) -> Optional[RunningTask]:
574  """Send a `DockRobot` action request."""
575  self.clearTaskErrorclearTaskError()
576  self.infoinfo("Waiting for 'DockRobot' action server")
577  while not self.docking_client.wait_for_server(timeout_sec=1.0):
578  self.infoinfo('"DockRobot" action server not available, waiting...')
579 
580  goal_msg = DockRobot.Goal()
581  goal_msg.use_dock_id = True
582  goal_msg.dock_id = dock_id
583  goal_msg.navigate_to_staging_pose = nav_to_dock # if want to navigate before staging
584 
585  self.infoinfo('Docking at dock ID: ' + str(dock_id) + '...')
586  send_goal_future = self.docking_client.send_goal_async(goal_msg,
587  self._feedbackCallback_feedbackCallback)
588  rclpy.spin_until_future_complete(self, send_goal_future)
589  self.goal_handlegoal_handle = send_goal_future.result()
590 
591  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
592  msg = 'DockRobot request was rejected!'
593  self.setTaskErrorsetTaskError(DockRobot.UNKNOWN, msg)
594  self.errorerror(msg)
595  return None
596 
597  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
598  return RunningTask.DOCK_ROBOT
599 
600  def undockRobot(self, dock_type: str = '') -> Optional[RunningTask]:
601  """Send a `UndockRobot` action request."""
602  self.clearTaskErrorclearTaskError()
603  self.infoinfo("Waiting for 'UndockRobot' action server")
604  while not self.undocking_client.wait_for_server(timeout_sec=1.0):
605  self.infoinfo('"UndockRobot" action server not available, waiting...')
606 
607  goal_msg = UndockRobot.Goal()
608  goal_msg.dock_type = dock_type
609 
610  self.infoinfo('Undocking from dock of type: ' + str(dock_type) + '...')
611  send_goal_future = self.undocking_client.send_goal_async(goal_msg,
612  self._feedbackCallback_feedbackCallback)
613  rclpy.spin_until_future_complete(self, send_goal_future)
614  self.goal_handlegoal_handle = send_goal_future.result()
615 
616  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
617  msg = 'UndockRobot request was rejected!'
618  self.setTaskErrorsetTaskError(UndockRobot.UNKNOWN, msg)
619  self.errorerror(msg)
620  return None
621 
622  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
623  return RunningTask.UNDOCK_ROBOT
624 
625  def cancelTask(self) -> None:
626  """Cancel pending task request of any type."""
627  self.infoinfo('Canceling current task.')
628  if self.result_futureresult_future:
629  if self.goal_handlegoal_handle is not None:
630  future = self.goal_handlegoal_handle.cancel_goal_async()
631  rclpy.spin_until_future_complete(self, future)
632  else:
633  self.errorerror('Cancel task failed, goal handle is None')
634  self.setTaskErrorsetTaskError(0, 'Cancel task failed, goal handle is None')
635  return
636  if self.route_result_futureroute_result_future:
637  if self.route_goal_handleroute_goal_handle is not None:
638  future = self.route_goal_handleroute_goal_handle.cancel_goal_async()
639  rclpy.spin_until_future_complete(self, future)
640  else:
641  self.errorerror('Cancel route task failed, goal handle is None')
642  self.setTaskErrorsetTaskError(0, 'Cancel route task failed, goal handle is None')
643  return
644  self.clearTaskErrorclearTaskError()
645  return
646 
647  def isTaskComplete(self, task: RunningTask = RunningTask.NONE) -> bool:
648  """Check if the task request of any type is complete yet."""
649  # Find the result future to spin
650  if task is None:
651  self.errorerror('Task is None, cannot check for completion')
652  return False
653 
654  result_future = None
655  if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
656  result_future = self.result_futureresult_future
657  else:
658  result_future = self.route_result_futureroute_result_future
659  if not result_future:
660  # task was cancelled or completed
661  return True
662 
663  # Get the result of the future, if complete
664  rclpy.spin_until_future_complete(self, result_future, timeout_sec=0.10)
665  result_response = result_future.result()
666 
667  if result_response:
668  self.statusstatus = result_response.status
669  if self.statusstatus != GoalStatus.STATUS_SUCCEEDED:
670  result = result_response.result
671  if result is not None:
672  self.setTaskErrorsetTaskError(result.error_code, result.error_msg)
673  self.debugdebug(
674  'Task with failed with'
675  f' status code:{self.status}'
676  f' error code:{result.error_code}'
677  f' error msg:{result.error_msg}')
678  return True
679  else:
680  self.setTaskErrorsetTaskError(0, 'No result received')
681  self.debugdebug('Task failed with no result received')
682  return True
683  else:
684  # Timed out, still processing, not complete yet
685  return False
686 
687  self.debugdebug('Task succeeded!')
688  return True
689 
690  def getFeedback(self, task: RunningTask = RunningTask.NONE) -> Any:
691  """Get the pending action feedback message."""
692  if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
693  return self.feedbackfeedback
694  if len(self.route_feedback) > 0:
695  return self.route_feedback.pop(0)
696  return None
697 
698  def getResult(self) -> TaskResult:
699  """Get the pending action result message."""
700  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
701  return TaskResult.SUCCEEDED
702  elif self.statusstatus == GoalStatus.STATUS_ABORTED:
703  return TaskResult.FAILED
704  elif self.statusstatus == GoalStatus.STATUS_CANCELED:
705  return TaskResult.CANCELED
706  else:
707  return TaskResult.UNKNOWN
708 
709  def clearTaskError(self) -> None:
710  self.last_action_error_codelast_action_error_code = 0
711  self.last_action_error_msglast_action_error_msg = ''
712 
713  def setTaskError(self, error_code: int, error_msg: str) -> None:
714  self.last_action_error_codelast_action_error_code = error_code
715  self.last_action_error_msglast_action_error_msg = error_msg
716 
717  def getTaskError(self) -> tuple[int, str]:
718  return (self.last_action_error_codelast_action_error_code, self.last_action_error_msglast_action_error_msg)
719 
720  def waitUntilNav2Active(self, navigator: str = 'bt_navigator',
721  localizer: str = 'amcl') -> None:
722  """Block until the full navigation system is up and running."""
723  if localizer != 'robot_localization': # non-lifecycle node
724  self._waitForNodeToActivate_waitForNodeToActivate(localizer)
725  if localizer == 'amcl':
726  self._waitForInitialPose_waitForInitialPose()
727  self._waitForNodeToActivate_waitForNodeToActivate(navigator)
728  self.infoinfo('Nav2 is ready for use!')
729  return
730 
731  def _getPathImpl(
732  self, start: PoseStamped, goal: PoseStamped,
733  planner_id: str = '', use_start: bool = False
734  ) -> ComputePathToPose.Result:
735  """
736  Send a `ComputePathToPose` action request.
737 
738  Internal implementation to get the full result, not just the path.
739  """
740  self.debugdebug("Waiting for 'ComputePathToPose' action server")
741  while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0):
742  self.infoinfo("'ComputePathToPose' action server not available, waiting...")
743 
744  goal_msg = ComputePathToPose.Goal()
745  goal_msg.start = start
746  goal_msg.goal = goal
747  goal_msg.planner_id = planner_id
748  goal_msg.use_start = use_start
749 
750  self.infoinfo('Getting path...')
751  send_goal_future = self.compute_path_to_pose_client.send_goal_async(goal_msg)
752  rclpy.spin_until_future_complete(self, send_goal_future)
753  self.goal_handlegoal_handle = send_goal_future.result()
754 
755  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
756  self.errorerror('Get path was rejected!')
757  self.statusstatus = GoalStatus.UNKNOWN
758  result = ComputePathToPose.Result()
759  result.error_code = ComputePathToPose.UNKNOWN
760  result.error_msg = 'Get path was rejected'
761  return result
762 
763  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
764  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
765  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
766 
767  return self.result_futureresult_future.result().result # type: ignore[union-attr]
768 
769  def getPath(
770  self, start: PoseStamped, goal: PoseStamped,
771  planner_id: str = '', use_start: bool = False) -> Path:
772  """Send a `ComputePathToPose` action request."""
773  self.clearTaskErrorclearTaskError()
774  rtn = self._getPathImpl_getPathImpl(start, goal, planner_id, use_start)
775 
776  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
777  return rtn.path
778  else:
779  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
780  self.warnwarn('Getting path failed with'
781  f' status code:{self.status}'
782  f' error code:{rtn.error_code}'
783  f' error msg:{rtn.error_msg}')
784  return None
785 
786  def _getPathThroughPosesImpl(
787  self, start: PoseStamped, goals: Goals,
788  planner_id: str = '', use_start: bool = False
789  ) -> ComputePathThroughPoses.Result:
790  """
791  Send a `ComputePathThroughPoses` action request.
792 
793  Internal implementation to get the full result, not just the path.
794  """
795  self.debugdebug("Waiting for 'ComputePathThroughPoses' action server")
796  while not self.compute_path_through_poses_client.wait_for_server(
797  timeout_sec=1.0
798  ):
799  self.infoinfo(
800  "'ComputePathThroughPoses' action server not available, waiting..."
801  )
802 
803  goal_msg = ComputePathThroughPoses.Goal()
804  goal_msg.start = start
805  goal_msg.goals.header.frame_id = 'map'
806  goal_msg.goals.header.stamp = self.get_clock().now().to_msg()
807  goal_msg.goals.goals = goals
808  goal_msg.planner_id = planner_id
809  goal_msg.use_start = use_start
810 
811  self.infoinfo('Getting path...')
812  send_goal_future = self.compute_path_through_poses_client.send_goal_async(
813  goal_msg
814  )
815  rclpy.spin_until_future_complete(self, send_goal_future)
816  self.goal_handlegoal_handle = send_goal_future.result()
817 
818  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
819  self.errorerror('Get path was rejected!')
820  result = ComputePathThroughPoses.Result()
821  result.error_code = ComputePathThroughPoses.UNKNOWN
822  result.error_msg = 'Get path was rejected!'
823  return result
824 
825  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
826  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
827  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
828 
829  return self.result_futureresult_future.result().result # type: ignore[union-attr]
830 
832  self, start: PoseStamped, goals: Goals,
833  planner_id: str = '', use_start: bool = False) -> Path:
834  """Send a `ComputePathThroughPoses` action request."""
835  self.clearTaskErrorclearTaskError()
836  rtn = self._getPathThroughPosesImpl_getPathThroughPosesImpl(start, goals, planner_id, use_start)
837 
838  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
839  return rtn.path
840  else:
841  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
842  self.warnwarn('Getting path failed with'
843  f' status code:{self.status}'
844  f' error code:{rtn.error_code}'
845  f' error msg:{rtn.error_msg}')
846  return None
847 
848  def _getRouteImpl(
849  self, start: Union[int, PoseStamped],
850  goal: Union[int, PoseStamped], use_start: bool = False
851  ) -> ComputeRoute.Result:
852  """
853  Send a `ComputeRoute` action request.
854 
855  Internal implementation to get the full result, not just the sparse route and dense path.
856  """
857  self.debugdebug("Waiting for 'ComputeRoute' action server")
858  while not self.compute_route_client.wait_for_server(timeout_sec=1.0):
859  self.infoinfo("'ComputeRoute' action server not available, waiting...")
860 
861  goal_msg = ComputeRoute.Goal()
862  goal_msg.use_start = use_start
863 
864  # Support both ID based requests and PoseStamped based requests
865  if isinstance(start, int) and isinstance(goal, int):
866  goal_msg.start_id = start
867  goal_msg.goal_id = goal
868  goal_msg.use_poses = False
869  elif isinstance(start, PoseStamped) and isinstance(goal, PoseStamped):
870  goal_msg.start = start
871  goal_msg.goal = goal
872  goal_msg.use_poses = True
873  else:
874  self.errorerror('Invalid start and goal types. Must be PoseStamped for pose or int for ID')
875  result = ComputeRoute.Result()
876  result.error_code = ComputeRoute.UNKNOWN
877  result.error_msg = 'Request type fields were invalid!'
878  return result
879 
880  self.infoinfo('Getting route...')
881  send_goal_future = self.compute_route_client.send_goal_async(goal_msg)
882  rclpy.spin_until_future_complete(self, send_goal_future)
883  self.goal_handlegoal_handle = send_goal_future.result()
884 
885  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
886  self.errorerror('Get route was rejected!')
887  result = ComputeRoute.Result()
888  result.error_code = ComputeRoute.UNKNOWN
889  result.error_msg = 'Get route was rejected!'
890  return result
891 
892  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
893  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
894  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
895 
896  return self.result_futureresult_future.result().result # type: ignore[union-attr]
897 
898  def getRoute(
899  self, start: Union[int, PoseStamped],
900  goal: Union[int, PoseStamped],
901  use_start: bool = False) -> Optional[list[Union[Path, Route]]]:
902  """Send a `ComputeRoute` action request."""
903  self.clearTaskErrorclearTaskError()
904  rtn = self._getRouteImpl_getRouteImpl(start, goal, use_start=False)
905 
906  if self.statusstatus != GoalStatus.STATUS_SUCCEEDED:
907  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
908  self.warnwarn(
909  'Getting route failed with'
910  f' status code:{self.status}'
911  f' error code:{rtn.error_code}'
912  f' error msg:{rtn.error_msg}')
913  return None
914 
915  return [rtn.path, rtn.route]
916 
918  self, start: Union[int, PoseStamped],
919  goal: Union[int, PoseStamped], use_start: bool = False
920  ) -> Optional[RunningTask]:
921  """Send a `ComputeAndTrackRoute` action request."""
922  self.clearTaskErrorclearTaskError()
923  self.debugdebug("Waiting for 'ComputeAndTrackRoute' action server")
924  while not self.compute_and_track_route_client.wait_for_server(timeout_sec=1.0):
925  self.infoinfo("'ComputeAndTrackRoute' action server not available, waiting...")
926 
927  goal_msg = ComputeAndTrackRoute.Goal()
928  goal_msg.use_start = use_start
929 
930  # Support both ID based requests and PoseStamped based requests
931  if isinstance(start, int) and isinstance(goal, int):
932  goal_msg.start_id = start
933  goal_msg.goal_id = goal
934  goal_msg.use_poses = False
935  elif isinstance(start, PoseStamped) and isinstance(goal, PoseStamped):
936  goal_msg.start = start
937  goal_msg.goal = goal
938  goal_msg.use_poses = True
939  else:
940  self.setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN, 'Request type fields were invalid!')
941  self.errorerror('Invalid start and goal types. Must be PoseStamped for pose or int for ID')
942  return None
943 
944  self.infoinfo('Computing and tracking route...')
945  send_goal_future = self.compute_and_track_route_client.send_goal_async(goal_msg,
946  self._routeFeedbackCallback_routeFeedbackCallback) # noqa: E128
947  rclpy.spin_until_future_complete(self, send_goal_future)
948  self.route_goal_handleroute_goal_handle = send_goal_future.result()
949 
950  if not self.route_goal_handleroute_goal_handle or not self.route_goal_handleroute_goal_handle.accepted:
951  msg = 'Compute and track route was rejected!'
952  self.setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN, msg)
953  self.errorerror(msg)
954  return None
955 
956  self.route_result_futureroute_result_future = self.route_goal_handleroute_goal_handle.get_result_async()
957  return RunningTask.COMPUTE_AND_TRACK_ROUTE
958 
959  def _smoothPathImpl(
960  self, path: Path, smoother_id: str = '',
961  max_duration: float = 2.0, check_for_collision: bool = False
962  ) -> SmoothPath.Result:
963  """
964  Send a `SmoothPath` action request.
965 
966  Internal implementation to get the full result, not just the path.
967  """
968  self.debugdebug("Waiting for 'SmoothPath' action server")
969  while not self.smoother_client.wait_for_server(timeout_sec=1.0):
970  self.infoinfo("'SmoothPath' action server not available, waiting...")
971 
972  goal_msg = SmoothPath.Goal()
973  goal_msg.path = path
974  goal_msg.max_smoothing_duration = rclpyDuration(seconds=max_duration).to_msg()
975  goal_msg.smoother_id = smoother_id
976  goal_msg.check_for_collisions = check_for_collision
977 
978  self.infoinfo('Smoothing path...')
979  send_goal_future = self.smoother_client.send_goal_async(goal_msg)
980  rclpy.spin_until_future_complete(self, send_goal_future)
981  self.goal_handlegoal_handle = send_goal_future.result()
982 
983  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
984  self.errorerror('Smooth path was rejected!')
985  result = SmoothPath.Result()
986  result.error_code = SmoothPath.UNKNOWN
987  result.error_msg = 'Smooth path was rejected'
988  return result
989 
990  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
991  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
992  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
993 
994  return self.result_futureresult_future.result().result # type: ignore[union-attr]
995 
997  self, path: Path, smoother_id: str = '',
998  max_duration: float = 2.0, check_for_collision: bool = False) -> Path:
999  """Send a `SmoothPath` action request."""
1000  self.clearTaskErrorclearTaskError()
1001  rtn = self._smoothPathImpl_smoothPathImpl(path, smoother_id, max_duration, check_for_collision)
1002 
1003  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
1004  return rtn.path
1005  else:
1006  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
1007  self.warnwarn('Getting path failed with'
1008  f' status code:{self.status}'
1009  f' error code:{rtn.error_code}'
1010  f' error msg:{rtn.error_msg}')
1011  return None
1012 
1013  def changeMap(self, map_filepath: str) -> bool:
1014  """Change the current static map in the map server."""
1015  while not self.change_maps_srv.wait_for_service(timeout_sec=1.0):
1016  self.infoinfo('change map service not available, waiting...')
1017  req = LoadMap.Request()
1018  req.map_url = map_filepath
1019  future = self.change_maps_srv.call_async(req)
1020  rclpy.spin_until_future_complete(self, future)
1021 
1022  future_result = future.result()
1023  if future_result is None:
1024  self.errorerror('Change map request failed!')
1025  return False
1026 
1027  result = future_result.result
1028  if result != LoadMap.Response().RESULT_SUCCESS:
1029  if result == LoadMap.RESULT_MAP_DOES_NOT_EXIST:
1030  reason = 'Map does not exist'
1031  elif result == LoadMap.INVALID_MAP_DATA:
1032  reason = 'Invalid map data'
1033  elif result == LoadMap.INVALID_MAP_METADATA:
1034  reason = 'Invalid map metadata'
1035  elif result == LoadMap.UNDEFINED_FAILURE:
1036  reason = 'Undefined failure'
1037  else:
1038  reason = 'Unknown'
1039  self.setTaskErrorsetTaskError(result, reason)
1040  self.errorerror(f'Change map request failed:{reason}!')
1041  return False
1042  else:
1043  self.infoinfo('Change map request was successful!')
1044  return True
1045 
1046  def clearAllCostmaps(self) -> None:
1047  """Clear all costmaps."""
1048  self.clearLocalCostmapclearLocalCostmap()
1049  self.clearGlobalCostmapclearGlobalCostmap()
1050  return
1051 
1052  def clearLocalCostmap(self) -> None:
1053  """Clear local costmap."""
1054  while not self.clear_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1055  self.infoinfo('Clear local costmaps service not available, waiting...')
1056  req = ClearEntireCostmap.Request()
1057  future = self.clear_costmap_local_srv.call_async(req)
1058  rclpy.spin_until_future_complete(self, future)
1059  return
1060 
1061  def clearGlobalCostmap(self) -> None:
1062  """Clear global costmap."""
1063  while not self.clear_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1064  self.infoinfo('Clear global costmaps service not available, waiting...')
1065  req = ClearEntireCostmap.Request()
1066  future = self.clear_costmap_global_srv.call_async(req)
1067  rclpy.spin_until_future_complete(self, future)
1068  return
1069 
1070  def clearCostmapExceptRegion(self, reset_distance: float) -> None:
1071  """Clear the costmap except for a specified region."""
1072  while not self.clear_costmap_except_region_srv.wait_for_service(timeout_sec=1.0):
1073  self.infoinfo('ClearCostmapExceptRegion service not available, waiting...')
1074  req = ClearCostmapExceptRegion.Request()
1075  req.reset_distance = reset_distance
1076  future = self.clear_costmap_except_region_srv.call_async(req)
1077  rclpy.spin_until_future_complete(self, future)
1078  return
1079 
1080  def clearCostmapAroundRobot(self, reset_distance: float) -> None:
1081  """Clear the costmap around the robot."""
1082  while not self.clear_costmap_around_robot_srv.wait_for_service(timeout_sec=1.0):
1083  self.infoinfo('ClearCostmapAroundRobot service not available, waiting...')
1084  req = ClearCostmapAroundRobot.Request()
1085  req.reset_distance = reset_distance
1086  future = self.clear_costmap_around_robot_srv.call_async(req)
1087  rclpy.spin_until_future_complete(self, future)
1088  return
1089 
1090  def clearLocalCostmapAroundPose(self, pose: PoseStamped, reset_distance: float) -> None:
1091  """Clear the costmap around a given pose."""
1092  while not self.clear_local_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1093  self.infoinfo('ClearLocalCostmapAroundPose service not available, waiting...')
1094  req = ClearCostmapAroundPose.Request()
1095  req.pose = pose
1096  req.reset_distance = reset_distance
1097  future = self.clear_local_costmap_around_pose_srv.call_async(req)
1098  rclpy.spin_until_future_complete(self, future)
1099  return
1100 
1101  def clearGlobalCostmapAroundPose(self, pose: PoseStamped, reset_distance: float) -> None:
1102  """Clear the global costmap around a given pose."""
1103  while not self.clear_global_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1104  self.infoinfo('ClearGlobalCostmapAroundPose service not available, waiting...')
1105  req = ClearCostmapAroundPose.Request()
1106  req.pose = pose
1107  req.reset_distance = reset_distance
1108  future = self.clear_global_costmap_around_pose_srv.call_async(req)
1109  rclpy.spin_until_future_complete(self, future)
1110  return
1111 
1112  def getGlobalCostmap(self) -> OccupancyGrid:
1113  """Get the global costmap."""
1114  while not self.get_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1115  self.infoinfo('Get global costmaps service not available, waiting...')
1116  req = GetCostmap.Request()
1117  future = self.get_costmap_global_srv.call_async(req)
1118  rclpy.spin_until_future_complete(self, future)
1119 
1120  result = future.result()
1121  if result is None:
1122  self.errorerror('Get global costmap request failed!')
1123  return None
1124 
1125  return result.map
1126 
1127  def getLocalCostmap(self) -> OccupancyGrid:
1128  """Get the local costmap."""
1129  while not self.get_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1130  self.infoinfo('Get local costmaps service not available, waiting...')
1131  req = GetCostmap.Request()
1132  future = self.get_costmap_local_srv.call_async(req)
1133  rclpy.spin_until_future_complete(self, future)
1134 
1135  result = future.result()
1136 
1137  if result is None:
1138  self.errorerror('Get local costmap request failed!')
1139  return None
1140 
1141  return result.map
1142 
1143  def lifecycleStartup(self) -> None:
1144  """Startup nav2 lifecycle system."""
1145  self.infoinfo('Starting up lifecycle nodes based on lifecycle_manager.')
1146  for srv_name, srv_type in self.get_service_names_and_types():
1147  if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
1148  self.infoinfo(f'Starting up {srv_name}')
1149  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1150  self.create_client(ManageLifecycleNodes, srv_name)
1151  while not mgr_client.wait_for_service(timeout_sec=1.0):
1152  self.infoinfo(f'{srv_name} service not available, waiting...')
1153  req = ManageLifecycleNodes.Request()
1154  req.command = ManageLifecycleNodes.Request().STARTUP
1155  future = mgr_client.call_async(req)
1156 
1157  # starting up requires a full map->odom->base_link TF tree
1158  # so if we're not successful, try forwarding the initial pose
1159  while True:
1160  rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
1161  if not future:
1162  self._waitForInitialPose_waitForInitialPose()
1163  else:
1164  break
1165  self.infoinfo('Nav2 is ready for use!')
1166  return
1167 
1168  def lifecycleShutdown(self) -> None:
1169  """Shutdown nav2 lifecycle system."""
1170  self.infoinfo('Shutting down lifecycle nodes based on lifecycle_manager.')
1171  for srv_name, srv_type in self.get_service_names_and_types():
1172  if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
1173  self.infoinfo(f'Shutting down {srv_name}')
1174  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1175  self.create_client(ManageLifecycleNodes, srv_name)
1176  while not mgr_client.wait_for_service(timeout_sec=1.0):
1177  self.infoinfo(f'{srv_name} service not available, waiting...')
1178  req = ManageLifecycleNodes.Request()
1179  req.command = ManageLifecycleNodes.Request().SHUTDOWN
1180  future = mgr_client.call_async(req)
1181  rclpy.spin_until_future_complete(self, future)
1182  future.result()
1183  return
1184 
1185  def _waitForNodeToActivate(self, node_name: str) -> None:
1186  # Waits for the node within the tester namespace to become active
1187  self.debugdebug(f'Waiting for {node_name} to become active..')
1188  node_service = f'{node_name}/get_state'
1189  state_client: Client[GetState.Request, GetState.Response] = \
1190  self.create_client(GetState, node_service)
1191  while not state_client.wait_for_service(timeout_sec=1.0):
1192  self.infoinfo(f'{node_service} service not available, waiting...')
1193 
1194  req = GetState.Request()
1195  state = 'unknown'
1196  while state != 'active':
1197  self.debugdebug(f'Getting {node_name} state...')
1198  future = state_client.call_async(req)
1199  rclpy.spin_until_future_complete(self, future)
1200 
1201  result = future.result()
1202  if result is not None:
1203  state = result.current_state.label
1204  self.debugdebug(f'Result of get_state: {state}')
1205  time.sleep(2)
1206  return
1207 
1208  def _waitForInitialPose(self) -> None:
1209  while not self.initial_pose_receivedinitial_pose_received:
1210  self.infoinfo('Setting initial pose')
1211  self._setInitialPose_setInitialPose()
1212  self.infoinfo('Waiting for amcl_pose to be received')
1213  rclpy.spin_once(self, timeout_sec=1.0)
1214  return
1215 
1216  def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) -> None:
1217  self.debugdebug('Received amcl pose')
1218  self.initial_pose_receivedinitial_pose_received = True
1219  return
1220 
1221  def _feedbackCallback(self, msg: NavigateToPose.Feedback) -> None:
1222  self.debugdebug('Received action feedback message')
1223  self.feedbackfeedback = msg.feedback
1224  return
1225 
1226  def _routeFeedbackCallback(self, msg: ComputeAndTrackRoute.Feedback) -> None:
1227  self.debugdebug('Received route action feedback message')
1228  self.route_feedback.append(msg.feedback)
1229  return
1230 
1231  def _setInitialPose(self) -> None:
1232  msg = PoseWithCovarianceStamped()
1233  msg.pose.pose = self.initial_poseinitial_pose.pose
1234  msg.header.frame_id = self.initial_poseinitial_pose.header.frame_id
1235  msg.header.stamp = self.initial_poseinitial_pose.header.stamp
1236  self.infoinfo('Publishing Initial Pose')
1237  self.initial_pose_pubinitial_pose_pub.publish(msg)
1238  return
1239 
1240  def info(self, msg: str) -> None:
1241  self.get_logger().info(msg)
1242  return
1243 
1244  def warn(self, msg: str) -> None:
1245  self.get_logger().warning(msg)
1246  return
1247 
1248  def error(self, msg: str) -> None:
1249  self.get_logger().error(msg)
1250  return
1251 
1252  def debug(self, msg: str) -> None:
1253  self.get_logger().debug(msg)
1254  return
Optional[RunningTask] undockRobot(self, str dock_type='')
ComputePathToPose.Result _getPathImpl(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
Path smoothPath(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
None clearCostmapExceptRegion(self, float reset_distance)
Optional[RunningTask] getAndTrackRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
Optional[RunningTask] followGpsWaypoints(self, list[GeoPose] gps_poses)
Path getPath(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
ComputePathThroughPoses.Result _getPathThroughPosesImpl(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
Optional[RunningTask] dockRobotByID(self, str dock_id, bool nav_to_dock=True)
Optional[RunningTask] goToPose(self, PoseStamped pose, str behavior_tree='')
None _feedbackCallback(self, NavigateToPose.Feedback msg)
None clearCostmapAroundRobot(self, float reset_distance)
Optional[RunningTask] goThroughPoses(self, Goals poses, str behavior_tree='')
None _amclPoseCallback(self, PoseWithCovarianceStamped msg)
Any getFeedback(self, RunningTask task=RunningTask.NONE)
bool isTaskComplete(self, RunningTask task=RunningTask.NONE)
None setInitialPose(self, PoseStamped initial_pose)
Path getPathThroughPoses(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
SmoothPath.Result _smoothPathImpl(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
Optional[RunningTask] followWaypoints(self, list[PoseStamped] poses)
None clearGlobalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
None clearLocalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
Optional[list[Union[Path, Route]]] getRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None waitUntilNav2Active(self, str navigator='bt_navigator', str localizer='amcl')
None _routeFeedbackCallback(self, ComputeAndTrackRoute.Feedback msg)
ComputeRoute.Result _getRouteImpl(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None setTaskError(self, int error_code, str error_msg)