Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
robot_navigator.py
1 #! /usr/bin/env python3
2 # Copyright 2021 Samsung Research America
3 # Copyright 2025 Open Navigation LLC
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 from enum import Enum
19 import time
20 from typing import Any, Optional, Union
21 
22 from action_msgs.msg import GoalStatus
23 from builtin_interfaces.msg import Duration
24 from geographic_msgs.msg import GeoPose
25 from geometry_msgs.msg import Point, PoseStamped, PoseWithCovarianceStamped
26 from lifecycle_msgs.srv import GetState
27 from nav2_msgs.action import (AssistedTeleop, BackUp, # type: ignore[attr-defined]
28  ComputeAndTrackRoute, ComputePathThroughPoses, ComputePathToPose,
29  ComputeRoute, DockRobot, DriveOnHeading, FollowGPSWaypoints,
30  FollowObject, FollowPath, FollowWaypoints, NavigateThroughPoses,
31  NavigateToPose, SmoothPath, Spin, UndockRobot)
32 from nav2_msgs.msg import Costmap, Route
33 from nav2_msgs.srv import (ClearCostmapAroundPose, ClearCostmapAroundRobot,
34  ClearCostmapExceptRegion, ClearEntireCostmap, GetCostmap, LoadMap,
35  ManageLifecycleNodes, Toggle)
36 from nav_msgs.msg import Goals, Path
37 import rclpy
38 from rclpy.action import ActionClient
39 from rclpy.action.client import ClientGoalHandle
40 from rclpy.client import Client
41 from rclpy.duration import Duration as rclpyDuration
42 from rclpy.node import Node
43 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
44 from rclpy.task import Future
45 from rclpy.type_support import GetResultServiceResponse
46 
47 
48 # Task Result enum for the result of the task being executed
49 class TaskResult(Enum):
50  UNKNOWN = 0
51  SUCCEEDED = 1
52  CANCELED = 2
53  FAILED = 3
54 
55 
56 # Task enum for the task being executed, if its a long-running task to be able to obtain
57 # necessary contextual information in `isTaskComplete` and `getFeedback` regarding the task
58 # which is running.
59 class RunningTask(Enum):
60  NONE = 0
61  NAVIGATE_TO_POSE = 1
62  NAVIGATE_THROUGH_POSES = 2
63  FOLLOW_PATH = 3
64  FOLLOW_WAYPOINTS = 4
65  FOLLOW_GPS_WAYPOINTS = 5
66  SPIN = 6
67  BACKUP = 7
68  DRIVE_ON_HEADING = 8
69  ASSISTED_TELEOP = 9
70  DOCK_ROBOT = 10
71  UNDOCK_ROBOT = 11
72  COMPUTE_AND_TRACK_ROUTE = 12
73  FOLLOW_OBJECT = 13
74 
75 
76 class BasicNavigator(Node):
77 
78  def __init__(self, node_name: str = 'basic_navigator', namespace: str = '') -> None:
79  super().__init__(node_name=node_name, namespace=namespace)
80  self.initial_poseinitial_pose = PoseStamped()
81  self.initial_poseinitial_pose.header.frame_id = 'map'
82 
83  self.goal_handlegoal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None
84  self.result_futureresult_future: \
85  Optional[Future[GetResultServiceResponse[Any]]] = None
86  self.feedbackfeedback: Any = None
87  self.statusstatus: Optional[int] = None
88 
89  # Since the route server's compute and track action server is likely
90  # to be running simultaneously with another (e.g. controller, WPF) server,
91  # we must track its futures and feedback separately. Additionally, the
92  # route tracking feedback is uniquely important to be complete and ordered
93  self.route_goal_handleroute_goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None
94  self.route_result_futureroute_result_future: \
95  Optional[Future[GetResultServiceResponse[Any]]] = None
96  self.route_feedback: list[Any] = []
97 
98  # Error code and messages from servers
99  self.last_action_error_codelast_action_error_code = 0
100  self.last_action_error_msglast_action_error_msg = ''
101 
102  amcl_pose_qos = QoSProfile(
103  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
104  reliability=QoSReliabilityPolicy.RELIABLE,
105  history=QoSHistoryPolicy.KEEP_LAST,
106  depth=1,
107  )
108 
109  self.initial_pose_receivedinitial_pose_received = False
110  self.nav_through_poses_client: ActionClient[
111  NavigateThroughPoses.Goal,
112  NavigateThroughPoses.Result,
113  NavigateThroughPoses.Feedback
114  ] = ActionClient(
115  self, NavigateThroughPoses, 'navigate_through_poses')
116  self.nav_to_pose_client: ActionClient[
117  NavigateToPose.Goal,
118  NavigateToPose.Result,
119  NavigateToPose.Feedback
120  ] = ActionClient(self, NavigateToPose, 'navigate_to_pose')
121  self.follow_waypoints_client: ActionClient[
122  FollowWaypoints.Goal,
123  FollowWaypoints.Result,
124  FollowWaypoints.Feedback
125  ] = ActionClient(
126  self, FollowWaypoints, 'follow_waypoints'
127  )
128  self.follow_gps_waypoints_client: ActionClient[
129  FollowGPSWaypoints.Goal,
130  FollowGPSWaypoints.Result,
131  FollowGPSWaypoints.Feedback
132  ] = ActionClient(
133  self, FollowGPSWaypoints, 'follow_gps_waypoints'
134  )
135  self.follow_path_client: ActionClient[
136  FollowPath.Goal,
137  FollowPath.Result,
138  FollowPath.Feedback
139  ] = ActionClient(self, FollowPath, 'follow_path')
140  self.compute_path_to_pose_client: ActionClient[
141  ComputePathToPose.Goal,
142  ComputePathToPose.Result,
143  ComputePathToPose.Feedback
144  ] = ActionClient(
145  self, ComputePathToPose, 'compute_path_to_pose'
146  )
147  self.compute_path_through_poses_client: ActionClient[
148  ComputePathThroughPoses.Goal,
149  ComputePathThroughPoses.Result,
150  ComputePathThroughPoses.Feedback
151  ] = ActionClient(
152  self, ComputePathThroughPoses, 'compute_path_through_poses'
153  )
154  self.smoother_client: ActionClient[
155  SmoothPath.Goal,
156  SmoothPath.Result,
157  SmoothPath.Feedback
158  ] = ActionClient(self, SmoothPath, 'smooth_path')
159  self.compute_route_client: ActionClient[
160  ComputeRoute.Goal,
161  ComputeRoute.Result,
162  ComputeRoute.Feedback
163  ] = ActionClient(self, ComputeRoute, 'compute_route')
164  self.compute_and_track_route_client: ActionClient[
165  ComputeAndTrackRoute.Goal,
166  ComputeAndTrackRoute.Result,
167  ComputeAndTrackRoute.Feedback
168  ] = ActionClient(self, ComputeAndTrackRoute, 'compute_and_track_route')
169  self.spin_client: ActionClient[
170  Spin.Goal,
171  Spin.Result,
172  Spin.Feedback
173  ] = ActionClient(self, Spin, 'spin')
174 
175  self.backup_client: ActionClient[
176  BackUp.Goal,
177  BackUp.Result,
178  BackUp.Feedback
179  ] = ActionClient(self, BackUp, 'backup')
180  self.drive_on_heading_client: ActionClient[
181  DriveOnHeading.Goal,
182  DriveOnHeading.Result,
183  DriveOnHeading.Feedback
184  ] = ActionClient(
185  self, DriveOnHeading, 'drive_on_heading'
186  )
187  self.assisted_teleop_client: ActionClient[
188  AssistedTeleop.Goal,
189  AssistedTeleop.Result,
190  AssistedTeleop.Feedback
191  ] = ActionClient(
192  self, AssistedTeleop, 'assisted_teleop'
193  )
194  self.docking_client: ActionClient[
195  DockRobot.Goal,
196  DockRobot.Result,
197  DockRobot.Feedback
198  ] = ActionClient(self, DockRobot, 'dock_robot')
199  self.undocking_client: ActionClient[
200  UndockRobot.Goal,
201  UndockRobot.Result,
202  UndockRobot.Feedback
203  ] = ActionClient(self, UndockRobot, 'undock_robot')
204  self.following_client: ActionClient[
205  FollowObject.Goal,
206  FollowObject.Result,
207  FollowObject.Feedback
208  ] = ActionClient(self, FollowObject, 'follow_object')
209 
210  self.localization_pose_sublocalization_pose_sub = self.create_subscription(
211  PoseWithCovarianceStamped,
212  'amcl_pose',
213  self._amclPoseCallback_amclPoseCallback,
214  amcl_pose_qos,
215  )
216  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
217  PoseWithCovarianceStamped, 'initialpose', 10
218  )
219  self.change_maps_srv: Client[LoadMap.Request, LoadMap.Response] = \
220  self.create_client(LoadMap, 'map_server/load_map')
221  self.clear_costmap_global_srv: Client[
222  ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
223  self.create_client(
224  ClearEntireCostmap, 'global_costmap/clear_entirely_global_costmap'
225  )
226  self.clear_costmap_local_srv: Client[
227  ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
228  self.create_client(
229  ClearEntireCostmap, 'local_costmap/clear_entirely_local_costmap'
230  )
231  self.clear_costmap_except_region_srv: Client[
232  ClearCostmapExceptRegion.Request, ClearCostmapExceptRegion.Response] = \
233  self.create_client(
234  ClearCostmapExceptRegion, 'local_costmap/clear_costmap_except_region'
235  )
236  self.clear_costmap_around_robot_srv: Client[
237  ClearCostmapAroundRobot.Request, ClearCostmapAroundRobot.Response] = \
238  self.create_client(
239  ClearCostmapAroundRobot, 'local_costmap/clear_costmap_around_robot'
240  )
241  self.clear_local_costmap_around_pose_srv: Client[
242  ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
243  self.create_client(
244  ClearCostmapAroundPose, 'local_costmap/clear_costmap_around_pose'
245  )
246  self.clear_global_costmap_around_pose_srv: Client[
247  ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
248  self.create_client(
249  ClearCostmapAroundPose, 'global_costmap/clear_costmap_around_pose'
250  )
251  self.get_costmap_global_srv: Client[
252  GetCostmap.Request, GetCostmap.Response] = \
253  self.create_client(
254  GetCostmap, 'global_costmap/get_costmap'
255  )
256  self.get_costmap_local_srv: Client[
257  GetCostmap.Request, GetCostmap.Response] = \
258  self.create_client(
259  GetCostmap, 'local_costmap/get_costmap'
260  )
261  self.toggle_collision_monitor_srv: Client[
262  Toggle.Request, Toggle.Response] = \
263  self.create_client(
264  Toggle, 'collision_monitor/toggle'
265  )
266 
267  def destroyNode(self) -> None:
268  self.destroy_nodedestroy_node()
269 
270  def destroy_node(self) -> None:
271  self.nav_through_poses_client.destroy()
272  self.nav_to_pose_client.destroy()
273  self.follow_waypoints_client.destroy()
274  self.follow_path_client.destroy()
275  self.compute_path_to_pose_client.destroy()
276  self.compute_path_through_poses_client.destroy()
277  self.compute_and_track_route_client.destroy()
278  self.compute_route_client.destroy()
279  self.smoother_client.destroy()
280  self.spin_client.destroy()
281  self.backup_client.destroy()
282  self.drive_on_heading_client.destroy()
283  self.assisted_teleop_client.destroy()
284  self.follow_gps_waypoints_client.destroy()
285  self.docking_client.destroy()
286  self.undocking_client.destroy()
287  super().destroy_node()
288 
289  def setInitialPose(self, initial_pose: PoseStamped) -> None:
290  """Set the initial pose to the localization system."""
291  self.initial_pose_receivedinitial_pose_received = False
292  self.initial_poseinitial_pose = initial_pose
293  self._setInitialPose_setInitialPose()
294 
295  def goThroughPoses(self, poses: Goals, behavior_tree: str = '') -> Optional[RunningTask]:
296  """Send a `NavThroughPoses` action request."""
297  self.clearTaskErrorclearTaskError()
298  self.debugdebug("Waiting for 'NavigateThroughPoses' action server")
299  while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0):
300  self.infoinfo("'NavigateThroughPoses' action server not available, waiting...")
301 
302  goal_msg = NavigateThroughPoses.Goal()
303  goal_msg.poses = poses
304  goal_msg.behavior_tree = behavior_tree
305 
306  self.infoinfo(f'Navigating with {len(poses.goals)} goals....')
307  send_goal_future = self.nav_through_poses_client.send_goal_async(
308  goal_msg, self._feedbackCallback_feedbackCallback
309  )
310  rclpy.spin_until_future_complete(self, send_goal_future)
311  self.goal_handlegoal_handle = send_goal_future.result()
312 
313  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
314  msg = f'NavigateThroughPoses request with {len(poses.goals)} was rejected!'
315  self.setTaskErrorsetTaskError(NavigateThroughPoses.Result.UNKNOWN, msg)
316  self.errorerror(msg)
317  return None
318 
319  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
320  return RunningTask.NAVIGATE_THROUGH_POSES
321 
322  def goToPose(self, pose: PoseStamped, behavior_tree: str = '') -> Optional[RunningTask]:
323  """Send a `NavToPose` action request."""
324  self.clearTaskErrorclearTaskError()
325  self.debugdebug("Waiting for 'NavigateToPose' action server")
326  while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0):
327  self.infoinfo("'NavigateToPose' action server not available, waiting...")
328 
329  goal_msg = NavigateToPose.Goal()
330  goal_msg.pose = pose
331  goal_msg.behavior_tree = behavior_tree
332 
333  self.infoinfo(
334  'Navigating to goal: '
335  + str(pose.pose.position.x)
336  + ' '
337  + str(pose.pose.position.y)
338  + '...'
339  )
340  send_goal_future = self.nav_to_pose_client.send_goal_async(
341  goal_msg, self._feedbackCallback_feedbackCallback
342  )
343  rclpy.spin_until_future_complete(self, send_goal_future)
344  self.goal_handlegoal_handle = send_goal_future.result()
345 
346  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
347  msg = (
348  'NavigateToPose goal to '
349  + str(pose.pose.position.x)
350  + ' '
351  + str(pose.pose.position.y)
352  + ' was rejected!'
353  )
354  self.setTaskErrorsetTaskError(NavigateToPose.Result.UNKNOWN, msg)
355  self.errorerror(msg)
356  return None
357 
358  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
359  return RunningTask.NAVIGATE_TO_POSE
360 
361  def followWaypoints(self, poses: list[PoseStamped]) -> Optional[RunningTask]:
362  """Send a `FollowWaypoints` action request."""
363  self.clearTaskErrorclearTaskError()
364  self.debugdebug("Waiting for 'FollowWaypoints' action server")
365  while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0):
366  self.infoinfo("'FollowWaypoints' action server not available, waiting...")
367 
368  goal_msg = FollowWaypoints.Goal()
369  goal_msg.poses = poses
370 
371  self.infoinfo(f'Following {len(goal_msg.poses)} goals....')
372  send_goal_future = self.follow_waypoints_client.send_goal_async(
373  goal_msg, self._feedbackCallback_feedbackCallback
374  )
375  rclpy.spin_until_future_complete(self, send_goal_future)
376  self.goal_handlegoal_handle = send_goal_future.result()
377 
378  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
379  msg = f'Following {len(poses)} waypoints request was rejected!'
380  self.setTaskErrorsetTaskError(FollowWaypoints.Result.UNKNOWN, msg)
381  self.errorerror(msg)
382  return None
383 
384  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
385  return RunningTask.FOLLOW_WAYPOINTS
386 
387  def followGpsWaypoints(self, gps_poses: list[GeoPose]) -> Optional[RunningTask]:
388  """Send a `FollowGPSWaypoints` action request."""
389  self.clearTaskErrorclearTaskError()
390  self.debugdebug("Waiting for 'FollowWaypoints' action server")
391  while not self.follow_gps_waypoints_client.wait_for_server(timeout_sec=1.0):
392  self.infoinfo("'FollowWaypoints' action server not available, waiting...")
393 
394  goal_msg = FollowGPSWaypoints.Goal()
395  goal_msg.gps_poses = gps_poses
396 
397  self.infoinfo(f'Following {len(goal_msg.gps_poses)} gps goals....')
398  send_goal_future = self.follow_gps_waypoints_client.send_goal_async(
399  goal_msg, self._feedbackCallback_feedbackCallback
400  )
401  rclpy.spin_until_future_complete(self, send_goal_future)
402  self.goal_handlegoal_handle = send_goal_future.result()
403 
404  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
405  msg = f'Following {len(gps_poses)} gps waypoints request was rejected!'
406  self.setTaskErrorsetTaskError(FollowGPSWaypoints.Result.UNKNOWN, msg)
407  self.errorerror(msg)
408  return None
409 
410  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
411  return RunningTask.FOLLOW_GPS_WAYPOINTS
412 
413  def spin(
414  self, spin_dist: float = 1.57, time_allowance: int = 10,
415  disable_collision_checks: bool = False) -> Optional[RunningTask]:
416  self.clearTaskErrorclearTaskError()
417  self.debugdebug("Waiting for 'Spin' action server")
418  while not self.spin_client.wait_for_server(timeout_sec=1.0):
419  self.infoinfo("'Spin' action server not available, waiting...")
420  goal_msg = Spin.Goal()
421  goal_msg.target_yaw = spin_dist
422  goal_msg.time_allowance = Duration(sec=time_allowance)
423  goal_msg.disable_collision_checks = disable_collision_checks
424 
425  self.infoinfo(f'Spinning to angle {goal_msg.target_yaw}....')
426  send_goal_future = self.spin_client.send_goal_async(
427  goal_msg, self._feedbackCallback_feedbackCallback
428  )
429  rclpy.spin_until_future_complete(self, send_goal_future)
430  self.goal_handlegoal_handle = send_goal_future.result()
431 
432  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
433  msg = 'Spin request was rejected!'
434  self.setTaskErrorsetTaskError(Spin.Result.UNKNOWN, msg)
435  self.errorerror(msg)
436  return None
437 
438  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
439  return RunningTask.SPIN
440 
441  def backup(
442  self, backup_dist: float = 0.15, backup_speed: float = 0.025,
443  time_allowance: int = 10,
444  disable_collision_checks: bool = False) -> Optional[RunningTask]:
445  self.clearTaskErrorclearTaskError()
446  self.debugdebug("Waiting for 'Backup' action server")
447  while not self.backup_client.wait_for_server(timeout_sec=1.0):
448  self.infoinfo("'Backup' action server not available, waiting...")
449  goal_msg = BackUp.Goal()
450  goal_msg.target = Point(x=float(backup_dist))
451  goal_msg.speed = backup_speed
452  goal_msg.time_allowance = Duration(sec=time_allowance)
453  goal_msg.disable_collision_checks = disable_collision_checks
454 
455  self.infoinfo(f'Backing up {goal_msg.target.x} m at {goal_msg.speed} m/s....')
456  send_goal_future = self.backup_client.send_goal_async(
457  goal_msg, self._feedbackCallback_feedbackCallback
458  )
459  rclpy.spin_until_future_complete(self, send_goal_future)
460  self.goal_handlegoal_handle = send_goal_future.result()
461 
462  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
463  msg = 'Backup request was rejected!'
464  self.setTaskErrorsetTaskError(BackUp.Result.UNKNOWN, msg)
465  self.errorerror(msg)
466  return None
467 
468  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
469  return RunningTask.BACKUP
470 
471  def driveOnHeading(
472  self, dist: float = 0.15, speed: float = 0.025,
473  time_allowance: int = 10,
474  disable_collision_checks: bool = False) -> Optional[RunningTask]:
475  self.clearTaskErrorclearTaskError()
476  self.debugdebug("Waiting for 'DriveOnHeading' action server")
477  while not self.backup_client.wait_for_server(timeout_sec=1.0):
478  self.infoinfo("'DriveOnHeading' action server not available, waiting...")
479  goal_msg = DriveOnHeading.Goal()
480  goal_msg.target = Point(x=float(dist))
481  goal_msg.speed = speed
482  goal_msg.time_allowance = Duration(sec=time_allowance)
483  goal_msg.disable_collision_checks = disable_collision_checks
484 
485  self.infoinfo(f'Drive {goal_msg.target.x} m on heading at {goal_msg.speed} m/s....')
486  send_goal_future = self.drive_on_heading_client.send_goal_async(
487  goal_msg, self._feedbackCallback_feedbackCallback
488  )
489  rclpy.spin_until_future_complete(self, send_goal_future)
490  self.goal_handlegoal_handle = send_goal_future.result()
491 
492  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
493  msg = 'Drive On Heading request was rejected!'
494  self.setTaskErrorsetTaskError(DriveOnHeading.Result.UNKNOWN, msg)
495  self.errorerror(msg)
496  return None
497 
498  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
499  return RunningTask.DRIVE_ON_HEADING
500 
501  def assistedTeleop(self, time_allowance: int = 30) -> Optional[RunningTask]:
502 
503  self.clearTaskErrorclearTaskError()
504  self.debugdebug("Wanting for 'assisted_teleop' action server")
505 
506  while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0):
507  self.infoinfo("'assisted_teleop' action server not available, waiting...")
508  goal_msg = AssistedTeleop.Goal()
509  goal_msg.time_allowance = Duration(sec=time_allowance)
510 
511  self.infoinfo("Running 'assisted_teleop'....")
512  send_goal_future = self.assisted_teleop_client.send_goal_async(
513  goal_msg, self._feedbackCallback_feedbackCallback
514  )
515  rclpy.spin_until_future_complete(self, send_goal_future)
516  self.goal_handlegoal_handle = send_goal_future.result()
517 
518  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
519  msg = 'Assisted Teleop request was rejected!'
520  self.setTaskErrorsetTaskError(AssistedTeleop.Result.UNKNOWN, msg)
521  self.errorerror(msg)
522  return None
523 
524  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
525  return RunningTask.ASSISTED_TELEOP
526 
527  def followPath(self, path: Path, controller_id: str = '',
528  goal_checker_id: str = '') -> Optional[RunningTask]:
529  self.clearTaskErrorclearTaskError()
530  """Send a `FollowPath` action request."""
531  self.debugdebug("Waiting for 'FollowPath' action server")
532  while not self.follow_path_client.wait_for_server(timeout_sec=1.0):
533  self.infoinfo("'FollowPath' action server not available, waiting...")
534 
535  goal_msg = FollowPath.Goal()
536  goal_msg.path = path
537  goal_msg.controller_id = controller_id
538  goal_msg.goal_checker_id = goal_checker_id
539 
540  self.infoinfo('Executing path...')
541  send_goal_future = self.follow_path_client.send_goal_async(
542  goal_msg, self._feedbackCallback_feedbackCallback
543  )
544  rclpy.spin_until_future_complete(self, send_goal_future)
545  self.goal_handlegoal_handle = send_goal_future.result()
546 
547  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
548  msg = 'FollowPath goal was rejected!'
549  self.setTaskErrorsetTaskError(FollowPath.Result.UNKNOWN, msg)
550  self.errorerror(msg)
551  return None
552 
553  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
554  return RunningTask.FOLLOW_PATH
555 
556  def dockRobotByPose(self, dock_pose: PoseStamped,
557  dock_type: str = '', nav_to_dock: bool = True) -> Optional[RunningTask]:
558  self.clearTaskErrorclearTaskError()
559  """Send a `DockRobot` action request."""
560  self.infoinfo("Waiting for 'DockRobot' action server")
561  while not self.docking_client.wait_for_server(timeout_sec=1.0):
562  self.infoinfo('"DockRobot" action server not available, waiting...')
563 
564  goal_msg = DockRobot.Goal()
565  goal_msg.use_dock_id = False
566  goal_msg.dock_pose = dock_pose
567  goal_msg.dock_type = dock_type
568  goal_msg.navigate_to_staging_pose = nav_to_dock # if want to navigate before staging
569 
570  self.infoinfo('Docking at pose: ' + str(dock_pose) + '...')
571  send_goal_future = self.docking_client.send_goal_async(
572  goal_msg, self._feedbackCallback_feedbackCallback)
573  rclpy.spin_until_future_complete(self, send_goal_future)
574  self.goal_handlegoal_handle = send_goal_future.result()
575 
576  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
577  msg = 'DockRobot request was rejected!'
578  self.setTaskErrorsetTaskError(DockRobot.Result.UNKNOWN, msg)
579  self.errorerror(msg)
580  return None
581 
582  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
583  return RunningTask.DOCK_ROBOT
584 
585  def dockRobotByID(self, dock_id: str, nav_to_dock: bool = True) -> Optional[RunningTask]:
586  """Send a `DockRobot` action request."""
587  self.clearTaskErrorclearTaskError()
588  self.infoinfo("Waiting for 'DockRobot' action server")
589  while not self.docking_client.wait_for_server(timeout_sec=1.0):
590  self.infoinfo('"DockRobot" action server not available, waiting...')
591 
592  goal_msg = DockRobot.Goal()
593  goal_msg.use_dock_id = True
594  goal_msg.dock_id = dock_id
595  goal_msg.navigate_to_staging_pose = nav_to_dock # if want to navigate before staging
596 
597  self.infoinfo('Docking at dock ID: ' + str(dock_id) + '...')
598  send_goal_future = self.docking_client.send_goal_async(
599  goal_msg, self._feedbackCallback_feedbackCallback)
600  rclpy.spin_until_future_complete(self, send_goal_future)
601  self.goal_handlegoal_handle = send_goal_future.result()
602 
603  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
604  msg = 'DockRobot request was rejected!'
605  self.setTaskErrorsetTaskError(DockRobot.Result.UNKNOWN, msg)
606  self.errorerror(msg)
607  return None
608 
609  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
610  return RunningTask.DOCK_ROBOT
611 
612  def undockRobot(self, dock_type: str = '') -> Optional[RunningTask]:
613  """Send a `UndockRobot` action request."""
614  self.clearTaskErrorclearTaskError()
615  self.infoinfo("Waiting for 'UndockRobot' action server")
616  while not self.undocking_client.wait_for_server(timeout_sec=1.0):
617  self.infoinfo('"UndockRobot" action server not available, waiting...')
618 
619  goal_msg = UndockRobot.Goal()
620  goal_msg.dock_type = dock_type
621 
622  self.infoinfo('Undocking from dock of type: ' + str(dock_type) + '...')
623  send_goal_future = self.undocking_client.send_goal_async(
624  goal_msg, self._feedbackCallback_feedbackCallback)
625  rclpy.spin_until_future_complete(self, send_goal_future)
626  self.goal_handlegoal_handle = send_goal_future.result()
627 
628  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
629  msg = 'UndockRobot request was rejected!'
630  self.setTaskErrorsetTaskError(UndockRobot.Result.UNKNOWN, msg)
631  self.errorerror(msg)
632  return None
633 
634  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
635  return RunningTask.UNDOCK_ROBOT
636 
637  def followObjectByTopic(self, topic: str, max_duration: int = 0) -> Optional[RunningTask]:
638  """Send a `FollowObject` action request."""
639  self.clearTaskErrorclearTaskError()
640  self.infoinfo("Waiting for 'FollowObject' action server")
641  while not self.following_client.wait_for_server(timeout_sec=1.0):
642  self.infoinfo('"FollowObject" action server not available, waiting...')
643 
644  goal_msg = FollowObject.Goal()
645  goal_msg.pose_topic = topic
646  goal_msg.max_duration = Duration(sec=max_duration)
647 
648  self.infoinfo('Following object on topic: ' + str(topic) + '...')
649  send_goal_future = self.following_client.send_goal_async(
650  goal_msg, self._feedbackCallback_feedbackCallback)
651  rclpy.spin_until_future_complete(self, send_goal_future)
652  self.goal_handlegoal_handle = send_goal_future.result()
653 
654  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
655  msg = 'FollowObject request was rejected!'
656  self.setTaskErrorsetTaskError(FollowObject.Result.UNKNOWN, msg)
657  self.errorerror(msg)
658  return None
659 
660  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
661  return RunningTask.FOLLOW_OBJECT
662 
663  def followObjectByFrame(self, frame: str, max_duration: int = 0) -> Optional[RunningTask]:
664  """Send a `FollowObject` action request."""
665  self.clearTaskErrorclearTaskError()
666  self.infoinfo("Waiting for 'FollowObject' action server")
667  while not self.following_client.wait_for_server(timeout_sec=1.0):
668  self.infoinfo('"FollowObject" action server not available, waiting...')
669 
670  goal_msg = FollowObject.Goal()
671  goal_msg.tracked_frame = frame
672  goal_msg.max_duration = Duration(sec=max_duration)
673 
674  self.infoinfo('Following object in frame: ' + str(frame) + '...')
675  send_goal_future = self.following_client.send_goal_async(
676  goal_msg, self._feedbackCallback_feedbackCallback)
677  rclpy.spin_until_future_complete(self, send_goal_future)
678  self.goal_handlegoal_handle = send_goal_future.result()
679 
680  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
681  msg = 'FollowObject request was rejected!'
682  self.setTaskErrorsetTaskError(FollowObject.Result.UNKNOWN, msg)
683  self.errorerror(msg)
684  return None
685 
686  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
687  return RunningTask.FOLLOW_OBJECT
688 
689  def cancelTask(self) -> None:
690  """Cancel pending task request of any type."""
691  self.infoinfo('Canceling current task.')
692  if self.result_futureresult_future:
693  if self.goal_handlegoal_handle is not None:
694  future = self.goal_handlegoal_handle.cancel_goal_async()
695  rclpy.spin_until_future_complete(self, future)
696  else:
697  self.errorerror('Cancel task failed, goal handle is None')
698  self.setTaskErrorsetTaskError(0, 'Cancel task failed, goal handle is None')
699  return
700  if self.route_result_futureroute_result_future:
701  if self.route_goal_handleroute_goal_handle is not None:
702  future = self.route_goal_handleroute_goal_handle.cancel_goal_async()
703  rclpy.spin_until_future_complete(self, future)
704  else:
705  self.errorerror('Cancel route task failed, goal handle is None')
706  self.setTaskErrorsetTaskError(0, 'Cancel route task failed, goal handle is None')
707  return
708  self.clearTaskErrorclearTaskError()
709  return
710 
711  def isTaskComplete(self, task: RunningTask = RunningTask.NONE) -> bool:
712  """Check if the task request of any type is complete yet."""
713  # Find the result future to spin
714  if task is None:
715  self.errorerror('Task is None, cannot check for completion')
716  return False
717 
718  result_future = None
719  if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
720  result_future = self.result_futureresult_future
721  else:
722  result_future = self.route_result_futureroute_result_future
723  if not result_future:
724  # task was cancelled or completed
725  return True
726 
727  # Get the result of the future, if complete
728  rclpy.spin_until_future_complete(self, result_future, timeout_sec=0.10)
729  result_response = result_future.result()
730 
731  if result_response:
732  self.statusstatus = result_response.status
733  if self.statusstatus != GoalStatus.STATUS_SUCCEEDED:
734  result = result_response.result
735  if result is not None:
736  self.setTaskErrorsetTaskError(result.error_code, result.error_msg)
737  self.debugdebug(
738  'Task with failed with'
739  f' status code:{self.status}'
740  f' error code:{result.error_code}'
741  f' error msg:{result.error_msg}')
742  return True
743  else:
744  self.setTaskErrorsetTaskError(0, 'No result received')
745  self.debugdebug('Task failed with no result received')
746  return True
747  else:
748  # Timed out, still processing, not complete yet
749  return False
750 
751  self.debugdebug('Task succeeded!')
752  return True
753 
754  def getFeedback(self, task: RunningTask = RunningTask.NONE) -> Any:
755  """Get the pending action feedback message."""
756  if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
757  return self.feedbackfeedback
758  if len(self.route_feedback) > 0:
759  return self.route_feedback.pop(0)
760  return None
761 
762  def getResult(self) -> TaskResult:
763  """Get the pending action result message."""
764  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
765  return TaskResult.SUCCEEDED
766  elif self.statusstatus == GoalStatus.STATUS_ABORTED:
767  return TaskResult.FAILED
768  elif self.statusstatus == GoalStatus.STATUS_CANCELED:
769  return TaskResult.CANCELED
770  else:
771  return TaskResult.UNKNOWN
772 
773  def clearTaskError(self) -> None:
774  self.last_action_error_codelast_action_error_code = 0
775  self.last_action_error_msglast_action_error_msg = ''
776 
777  def setTaskError(self, error_code: int, error_msg: str) -> None:
778  self.last_action_error_codelast_action_error_code = error_code
779  self.last_action_error_msglast_action_error_msg = error_msg
780 
781  def getTaskError(self) -> tuple[int, str]:
782  return (self.last_action_error_codelast_action_error_code, self.last_action_error_msglast_action_error_msg)
783 
784  def waitUntilNav2Active(self, navigator: str = 'bt_navigator',
785  localizer: str = 'amcl') -> None:
786  """Block until the full navigation system is up and running."""
787  if localizer != 'robot_localization': # non-lifecycle node
788  self._waitForNodeToActivate_waitForNodeToActivate(localizer)
789  if localizer == 'amcl':
790  self._waitForInitialPose_waitForInitialPose()
791  self._waitForNodeToActivate_waitForNodeToActivate(navigator)
792  self.infoinfo('Nav2 is ready for use!')
793  return
794 
795  def _getPathImpl(
796  self, start: PoseStamped, goal: PoseStamped,
797  planner_id: str = '', use_start: bool = False
798  ) -> ComputePathToPose.Result:
799  """
800  Send a `ComputePathToPose` action request.
801 
802  Internal implementation to get the full result, not just the path.
803  """
804  self.debugdebug("Waiting for 'ComputePathToPose' action server")
805  while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0):
806  self.infoinfo("'ComputePathToPose' action server not available, waiting...")
807 
808  goal_msg = ComputePathToPose.Goal()
809  goal_msg.start = start
810  goal_msg.goal = goal
811  goal_msg.planner_id = planner_id
812  goal_msg.use_start = use_start
813 
814  self.infoinfo('Getting path...')
815  send_goal_future = self.compute_path_to_pose_client.send_goal_async(goal_msg)
816  rclpy.spin_until_future_complete(self, send_goal_future)
817  self.goal_handlegoal_handle = send_goal_future.result()
818 
819  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
820  self.errorerror('Get path was rejected!')
821  self.statusstatus = GoalStatus.STATUS_UNKNOWN
822  result = ComputePathToPose.Result()
823  result.error_code = ComputePathToPose.Result.UNKNOWN
824  result.error_msg = 'Get path was rejected'
825  return result
826 
827  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
828  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
829  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
830 
831  return self.result_futureresult_future.result().result # type: ignore[union-attr]
832 
833  def getPath(
834  self, start: PoseStamped, goal: PoseStamped,
835  planner_id: str = '', use_start: bool = False) -> Optional[Path]:
836  """Send a `ComputePathToPose` action request."""
837  self.clearTaskErrorclearTaskError()
838  rtn = self._getPathImpl_getPathImpl(start, goal, planner_id, use_start)
839 
840  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
841  return rtn.path
842  else:
843  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
844  self.warnwarn('Getting path failed with'
845  f' status code:{self.status}'
846  f' error code:{rtn.error_code}'
847  f' error msg:{rtn.error_msg}')
848  return None
849 
850  def _getPathThroughPosesImpl(
851  self, start: PoseStamped, goals: list[PoseStamped],
852  planner_id: str = '', use_start: bool = False
853  ) -> ComputePathThroughPoses.Result:
854  """
855  Send a `ComputePathThroughPoses` action request.
856 
857  Internal implementation to get the full result, not just the path.
858  """
859  self.debugdebug("Waiting for 'ComputePathThroughPoses' action server")
860  while not self.compute_path_through_poses_client.wait_for_server(
861  timeout_sec=1.0
862  ):
863  self.infoinfo(
864  "'ComputePathThroughPoses' action server not available, waiting..."
865  )
866 
867  goal_msg = ComputePathThroughPoses.Goal()
868  goal_msg.start = start
869  goal_msg.goals.header.frame_id = 'map'
870  goal_msg.goals.header.stamp = self.get_clock().now().to_msg()
871  goal_msg.goals.goals = goals
872  goal_msg.planner_id = planner_id
873  goal_msg.use_start = use_start
874 
875  self.infoinfo('Getting path...')
876  send_goal_future = self.compute_path_through_poses_client.send_goal_async(
877  goal_msg
878  )
879  rclpy.spin_until_future_complete(self, send_goal_future)
880  self.goal_handlegoal_handle = send_goal_future.result()
881 
882  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
883  self.errorerror('Get path was rejected!')
884  result = ComputePathThroughPoses.Result()
885  result.error_code = ComputePathThroughPoses.Result.UNKNOWN
886  result.error_msg = 'Get path was rejected!'
887  return result
888 
889  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
890  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
891  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
892 
893  return self.result_futureresult_future.result().result # type: ignore[union-attr]
894 
896  self, start: PoseStamped, goals: list[PoseStamped],
897  planner_id: str = '', use_start: bool = False) -> Optional[Path]:
898  """Send a `ComputePathThroughPoses` action request."""
899  self.clearTaskErrorclearTaskError()
900  rtn = self._getPathThroughPosesImpl_getPathThroughPosesImpl(start, goals, planner_id, use_start)
901 
902  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
903  return rtn.path
904  else:
905  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
906  self.warnwarn('Getting path failed with'
907  f' status code:{self.status}'
908  f' error code:{rtn.error_code}'
909  f' error msg:{rtn.error_msg}')
910  return None
911 
912  def _getRouteImpl(
913  self, start: Union[int, PoseStamped],
914  goal: Union[int, PoseStamped], use_start: bool = False
915  ) -> ComputeRoute.Result:
916  """
917  Send a `ComputeRoute` action request.
918 
919  Internal implementation to get the full result, not just the sparse route and dense path.
920  """
921  self.debugdebug("Waiting for 'ComputeRoute' action server")
922  while not self.compute_route_client.wait_for_server(timeout_sec=1.0):
923  self.infoinfo("'ComputeRoute' action server not available, waiting...")
924 
925  goal_msg = ComputeRoute.Goal()
926  goal_msg.use_start = use_start
927 
928  # Support both ID based requests and PoseStamped based requests
929  if isinstance(start, int) and isinstance(goal, int):
930  goal_msg.start_id = start
931  goal_msg.goal_id = goal
932  goal_msg.use_poses = False
933  elif isinstance(start, PoseStamped) and isinstance(goal, PoseStamped):
934  goal_msg.start = start
935  goal_msg.goal = goal
936  goal_msg.use_poses = True
937  else:
938  self.errorerror('Invalid start and goal types. Must be PoseStamped for pose or int for ID')
939  result = ComputeRoute.Result()
940  result.error_code = ComputeRoute.Result.UNKNOWN
941  result.error_msg = 'Request type fields were invalid!'
942  return result
943 
944  self.infoinfo('Getting route...')
945  send_goal_future = self.compute_route_client.send_goal_async(goal_msg)
946  rclpy.spin_until_future_complete(self, send_goal_future)
947  self.goal_handlegoal_handle = send_goal_future.result()
948 
949  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
950  self.errorerror('Get route was rejected!')
951  result = ComputeRoute.Result()
952  result.error_code = ComputeRoute.Result.UNKNOWN
953  result.error_msg = 'Get route was rejected!'
954  return result
955 
956  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
957  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
958  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
959 
960  return self.result_futureresult_future.result().result # type: ignore[union-attr]
961 
962  def getRoute(
963  self, start: Union[int, PoseStamped],
964  goal: Union[int, PoseStamped],
965  use_start: bool = False) -> Optional[list[Union[Path, Route]]]:
966  """Send a `ComputeRoute` action request."""
967  self.clearTaskErrorclearTaskError()
968  rtn = self._getRouteImpl_getRouteImpl(start, goal, use_start=False)
969 
970  if self.statusstatus != GoalStatus.STATUS_SUCCEEDED:
971  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
972  self.warnwarn(
973  'Getting route failed with'
974  f' status code:{self.status}'
975  f' error code:{rtn.error_code}'
976  f' error msg:{rtn.error_msg}')
977  return None
978 
979  return [rtn.path, rtn.route]
980 
982  self, start: Union[int, PoseStamped],
983  goal: Union[int, PoseStamped], use_start: bool = False
984  ) -> Optional[RunningTask]:
985  """Send a `ComputeAndTrackRoute` action request."""
986  self.clearTaskErrorclearTaskError()
987  self.debugdebug("Waiting for 'ComputeAndTrackRoute' action server")
988  while not self.compute_and_track_route_client.wait_for_server(timeout_sec=1.0):
989  self.infoinfo("'ComputeAndTrackRoute' action server not available, waiting...")
990 
991  goal_msg = ComputeAndTrackRoute.Goal()
992  goal_msg.use_start = use_start
993 
994  # Support both ID based requests and PoseStamped based requests
995  if isinstance(start, int) and isinstance(goal, int):
996  goal_msg.start_id = start
997  goal_msg.goal_id = goal
998  goal_msg.use_poses = False
999  elif isinstance(start, PoseStamped) and isinstance(goal, PoseStamped):
1000  goal_msg.start = start
1001  goal_msg.goal = goal
1002  goal_msg.use_poses = True
1003  else:
1004  self.setTaskErrorsetTaskError(ComputeAndTrackRoute.Result.UNKNOWN,
1005  'Request type fields were invalid!')
1006  self.errorerror('Invalid start and goal types. Must be PoseStamped for pose or int for ID')
1007  return None
1008 
1009  self.infoinfo('Computing and tracking route...')
1010  send_goal_future = self.compute_and_track_route_client.send_goal_async(goal_msg,
1011  self._routeFeedbackCallback_routeFeedbackCallback) # noqa: E128
1012  rclpy.spin_until_future_complete(self, send_goal_future)
1013  self.route_goal_handleroute_goal_handle = send_goal_future.result()
1014 
1015  if not self.route_goal_handleroute_goal_handle or not self.route_goal_handleroute_goal_handle.accepted:
1016  msg = 'Compute and track route was rejected!'
1017  self.setTaskErrorsetTaskError(ComputeAndTrackRoute.Result.UNKNOWN, msg)
1018  self.errorerror(msg)
1019  return None
1020 
1021  self.route_result_futureroute_result_future = self.route_goal_handleroute_goal_handle.get_result_async()
1022  return RunningTask.COMPUTE_AND_TRACK_ROUTE
1023 
1024  def _smoothPathImpl(
1025  self, path: Path, smoother_id: str = '',
1026  max_duration: float = 2.0, check_for_collision: bool = False
1027  ) -> SmoothPath.Result:
1028  """
1029  Send a `SmoothPath` action request.
1030 
1031  Internal implementation to get the full result, not just the path.
1032  """
1033  self.debugdebug("Waiting for 'SmoothPath' action server")
1034  while not self.smoother_client.wait_for_server(timeout_sec=1.0):
1035  self.infoinfo("'SmoothPath' action server not available, waiting...")
1036 
1037  goal_msg = SmoothPath.Goal()
1038  goal_msg.path = path
1039  goal_msg.max_smoothing_duration = rclpyDuration(seconds=max_duration).to_msg()
1040  goal_msg.smoother_id = smoother_id
1041  goal_msg.check_for_collisions = check_for_collision
1042 
1043  self.infoinfo('Smoothing path...')
1044  send_goal_future = self.smoother_client.send_goal_async(goal_msg)
1045  rclpy.spin_until_future_complete(self, send_goal_future)
1046  self.goal_handlegoal_handle = send_goal_future.result()
1047 
1048  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
1049  self.errorerror('Smooth path was rejected!')
1050  result = SmoothPath.Result()
1051  result.error_code = SmoothPath.Result.UNKNOWN
1052  result.error_msg = 'Smooth path was rejected'
1053  return result
1054 
1055  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
1056  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
1057  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
1058 
1059  return self.result_futureresult_future.result().result # type: ignore[union-attr]
1060 
1062  self, path: Path, smoother_id: str = '',
1063  max_duration: float = 2.0, check_for_collision: bool = False) -> Optional[Path]:
1064  """Send a `SmoothPath` action request."""
1065  self.clearTaskErrorclearTaskError()
1066  rtn = self._smoothPathImpl_smoothPathImpl(path, smoother_id, max_duration, check_for_collision)
1067 
1068  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
1069  return rtn.path
1070  else:
1071  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
1072  self.warnwarn('Getting path failed with'
1073  f' status code:{self.status}'
1074  f' error code:{rtn.error_code}'
1075  f' error msg:{rtn.error_msg}')
1076  return None
1077 
1078  def changeMap(self, map_filepath: str) -> bool:
1079  """Change the current static map in the map server."""
1080  while not self.change_maps_srv.wait_for_service(timeout_sec=1.0):
1081  self.infoinfo('change map service not available, waiting...')
1082  req = LoadMap.Request()
1083  req.map_url = map_filepath
1084  future = self.change_maps_srv.call_async(req)
1085  rclpy.spin_until_future_complete(self, future)
1086 
1087  future_result = future.result()
1088  if future_result is None:
1089  self.errorerror('Change map request failed!')
1090  return False
1091 
1092  result = future_result.result
1093  if result != LoadMap.Response.RESULT_SUCCESS:
1094  if result == LoadMap.Response.RESULT_MAP_DOES_NOT_EXIST:
1095  reason = 'Map does not exist'
1096  elif result == LoadMap.Response.RESULT_INVALID_MAP_DATA:
1097  reason = 'Invalid map data'
1098  elif result == LoadMap.Response.RESULT_INVALID_MAP_METADATA:
1099  reason = 'Invalid map metadata'
1100  elif result == LoadMap.Response.RESULT_UNDEFINED_FAILURE:
1101  reason = 'Undefined failure'
1102  else:
1103  reason = 'Unknown'
1104  self.setTaskErrorsetTaskError(result, reason)
1105  self.errorerror(f'Change map request failed:{reason}!')
1106  return False
1107  else:
1108  self.infoinfo('Change map request was successful!')
1109  return True
1110 
1111  def clearAllCostmaps(self) -> None:
1112  """Clear all costmaps."""
1113  self.clearLocalCostmapclearLocalCostmap()
1114  self.clearGlobalCostmapclearGlobalCostmap()
1115  return
1116 
1117  def clearLocalCostmap(self) -> None:
1118  """Clear local costmap."""
1119  while not self.clear_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1120  self.infoinfo('Clear local costmaps service not available, waiting...')
1121  req = ClearEntireCostmap.Request()
1122  future = self.clear_costmap_local_srv.call_async(req)
1123  rclpy.spin_until_future_complete(self, future)
1124 
1125  result = future.result()
1126  if result is None:
1127  self.errorerror('Clear local costmap request failed!')
1128 
1129  return
1130 
1131  def clearGlobalCostmap(self) -> None:
1132  """Clear global costmap."""
1133  while not self.clear_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1134  self.infoinfo('Clear global costmaps service not available, waiting...')
1135  req = ClearEntireCostmap.Request()
1136  future = self.clear_costmap_global_srv.call_async(req)
1137  rclpy.spin_until_future_complete(self, future)
1138 
1139  result = future.result()
1140  if result is None:
1141  self.errorerror('Clear global costmap request failed!')
1142 
1143  return
1144 
1145  def clearCostmapExceptRegion(self, reset_distance: float) -> None:
1146  """Clear the costmap except for a specified region."""
1147  while not self.clear_costmap_except_region_srv.wait_for_service(timeout_sec=1.0):
1148  self.infoinfo('ClearCostmapExceptRegion service not available, waiting...')
1149  req = ClearCostmapExceptRegion.Request()
1150  req.reset_distance = reset_distance
1151  future = self.clear_costmap_except_region_srv.call_async(req)
1152  rclpy.spin_until_future_complete(self, future)
1153 
1154  result = future.result()
1155  if result is None:
1156  self.errorerror('Clear costmap except region request failed!')
1157 
1158  return
1159 
1160  def clearCostmapAroundRobot(self, reset_distance: float) -> None:
1161  """Clear the costmap around the robot."""
1162  while not self.clear_costmap_around_robot_srv.wait_for_service(timeout_sec=1.0):
1163  self.infoinfo('ClearCostmapAroundRobot service not available, waiting...')
1164  req = ClearCostmapAroundRobot.Request()
1165  req.reset_distance = reset_distance
1166  future = self.clear_costmap_around_robot_srv.call_async(req)
1167  rclpy.spin_until_future_complete(self, future)
1168 
1169  result = future.result()
1170  if result is None:
1171  self.errorerror('Clear costmap around robot request failed!')
1172 
1173  return
1174 
1175  def clearLocalCostmapAroundPose(self, pose: PoseStamped, reset_distance: float) -> None:
1176  """Clear the costmap around a given pose."""
1177  while not self.clear_local_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1178  self.infoinfo('ClearLocalCostmapAroundPose service not available, waiting...')
1179  req = ClearCostmapAroundPose.Request()
1180  req.pose = pose
1181  req.reset_distance = reset_distance
1182  future = self.clear_local_costmap_around_pose_srv.call_async(req)
1183  rclpy.spin_until_future_complete(self, future)
1184 
1185  result = future.result()
1186  if result is None:
1187  self.errorerror('Clear local costmap around pose request failed!')
1188 
1189  return
1190 
1191  def clearGlobalCostmapAroundPose(self, pose: PoseStamped, reset_distance: float) -> None:
1192  """Clear the global costmap around a given pose."""
1193  while not self.clear_global_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1194  self.infoinfo('ClearGlobalCostmapAroundPose service not available, waiting...')
1195  req = ClearCostmapAroundPose.Request()
1196  req.pose = pose
1197  req.reset_distance = reset_distance
1198  future = self.clear_global_costmap_around_pose_srv.call_async(req)
1199  rclpy.spin_until_future_complete(self, future)
1200 
1201  result = future.result()
1202  if result is None:
1203  self.errorerror('Clear global costmap around pose request failed!')
1204 
1205  return
1206 
1207  def getGlobalCostmap(self) -> Optional[Costmap]:
1208  """Get the global costmap."""
1209  while not self.get_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1210  self.infoinfo('Get global costmaps service not available, waiting...')
1211  req = GetCostmap.Request()
1212  future = self.get_costmap_global_srv.call_async(req)
1213  rclpy.spin_until_future_complete(self, future)
1214 
1215  result = future.result()
1216  if result is None:
1217  self.errorerror('Get global costmap request failed!')
1218  return None
1219 
1220  return result.map
1221 
1222  def getLocalCostmap(self) -> Optional[Costmap]:
1223  """Get the local costmap."""
1224  while not self.get_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1225  self.infoinfo('Get local costmaps service not available, waiting...')
1226  req = GetCostmap.Request()
1227  future = self.get_costmap_local_srv.call_async(req)
1228  rclpy.spin_until_future_complete(self, future)
1229 
1230  result = future.result()
1231 
1232  if result is None:
1233  self.errorerror('Get local costmap request failed!')
1234  return None
1235 
1236  return result.map
1237 
1238  def toggleCollisionMonitor(self, enable: bool) -> None:
1239  """Toggle the collision monitor."""
1240  while not self.toggle_collision_monitor_srv.wait_for_service(timeout_sec=1.0):
1241  self.infoinfo('Toggle collision monitor service not available, waiting...')
1242  req = Toggle.Request()
1243  req.enable = enable
1244  future = self.toggle_collision_monitor_srv.call_async(req)
1245 
1246  rclpy.spin_until_future_complete(self, future)
1247  result = future.result()
1248  if result is None:
1249  self.errorerror('Toggle collision monitor request failed!')
1250 
1251  return
1252 
1253  def lifecycleStartup(self) -> None:
1254  """Startup nav2 lifecycle system."""
1255  self.infoinfo('Starting up lifecycle nodes based on lifecycle_manager.')
1256  for srv_name, srv_type in self.get_service_names_and_types():
1257  if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
1258  self.infoinfo(f'Starting up {srv_name}')
1259  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1260  self.create_client(ManageLifecycleNodes, srv_name)
1261  while not mgr_client.wait_for_service(timeout_sec=1.0):
1262  self.infoinfo(f'{srv_name} service not available, waiting...')
1263  req = ManageLifecycleNodes.Request()
1264  req.command = ManageLifecycleNodes.Request.STARTUP
1265  future = mgr_client.call_async(req)
1266 
1267  # starting up requires a full map->odom->base_link TF tree
1268  # so if we're not successful, try forwarding the initial pose
1269  while True:
1270  rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
1271  if not future:
1272  self._waitForInitialPose_waitForInitialPose()
1273  else:
1274  break
1275  self.infoinfo('Nav2 is ready for use!')
1276  return
1277 
1278  def lifecycleShutdown(self) -> None:
1279  """Shutdown nav2 lifecycle system."""
1280  self.infoinfo('Shutting down lifecycle nodes based on lifecycle_manager.')
1281  for srv_name, srv_type in self.get_service_names_and_types():
1282  if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
1283  self.infoinfo(f'Shutting down {srv_name}')
1284  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1285  self.create_client(ManageLifecycleNodes, srv_name)
1286  while not mgr_client.wait_for_service(timeout_sec=1.0):
1287  self.infoinfo(f'{srv_name} service not available, waiting...')
1288  req = ManageLifecycleNodes.Request()
1289  req.command = ManageLifecycleNodes.Request.SHUTDOWN
1290  future = mgr_client.call_async(req)
1291  rclpy.spin_until_future_complete(self, future)
1292  future.result()
1293  return
1294 
1295  def _waitForNodeToActivate(self, node_name: str) -> None:
1296  # Waits for the node within the tester namespace to become active
1297  self.debugdebug(f'Waiting for {node_name} to become active..')
1298  node_service = f'{node_name}/get_state'
1299  state_client: Client[GetState.Request, GetState.Response] = \
1300  self.create_client(GetState, node_service)
1301  while not state_client.wait_for_service(timeout_sec=1.0):
1302  self.infoinfo(f'{node_service} service not available, waiting...')
1303 
1304  req = GetState.Request()
1305  state = 'unknown'
1306  while state != 'active':
1307  self.debugdebug(f'Getting {node_name} state...')
1308  future = state_client.call_async(req)
1309  rclpy.spin_until_future_complete(self, future)
1310 
1311  result = future.result()
1312  if result is not None:
1313  state = result.current_state.label
1314  self.debugdebug(f'Result of get_state: {state}')
1315  time.sleep(2)
1316  return
1317 
1318  def _waitForInitialPose(self) -> None:
1319  while not self.initial_pose_receivedinitial_pose_received:
1320  self.infoinfo('Setting initial pose')
1321  self._setInitialPose_setInitialPose()
1322  self.infoinfo('Waiting for amcl_pose to be received')
1323  rclpy.spin_once(self, timeout_sec=1.0)
1324  return
1325 
1326  def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) -> None:
1327  self.debugdebug('Received amcl pose')
1328  self.initial_pose_receivedinitial_pose_received = True
1329  return
1330 
1331  def _feedbackCallback(self, msg: Any) -> None:
1332  self.debugdebug('Received action feedback message')
1333  self.feedbackfeedback = msg.feedback
1334  return
1335 
1336  def _routeFeedbackCallback(
1337  self, msg: ComputeAndTrackRoute.Impl.FeedbackMessage) -> None:
1338  self.debugdebug('Received route action feedback message')
1339  self.route_feedback.append(msg.feedback)
1340  return
1341 
1342  def _setInitialPose(self) -> None:
1343  msg = PoseWithCovarianceStamped()
1344  msg.pose.pose = self.initial_poseinitial_pose.pose
1345  msg.header.frame_id = self.initial_poseinitial_pose.header.frame_id
1346  msg.header.stamp = self.initial_poseinitial_pose.header.stamp
1347  self.infoinfo('Publishing Initial Pose')
1348  self.initial_pose_pubinitial_pose_pub.publish(msg)
1349  return
1350 
1351  def info(self, msg: str) -> None:
1352  self.get_logger().info(msg)
1353  return
1354 
1355  def warn(self, msg: str) -> None:
1356  self.get_logger().warning(msg)
1357  return
1358 
1359  def error(self, msg: str) -> None:
1360  self.get_logger().error(msg)
1361  return
1362 
1363  def debug(self, msg: str) -> None:
1364  self.get_logger().debug(msg)
1365  return
Optional[Path] getPathThroughPoses(self, PoseStamped start, list[PoseStamped] goals, str planner_id='', bool use_start=False)
Optional[RunningTask] undockRobot(self, str dock_type='')
ComputePathThroughPoses.Result _getPathThroughPosesImpl(self, PoseStamped start, list[PoseStamped] goals, str planner_id='', bool use_start=False)
ComputePathToPose.Result _getPathImpl(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
Optional[Path] getPath(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
None _routeFeedbackCallback(self, ComputeAndTrackRoute.Impl.FeedbackMessage msg)
None clearCostmapExceptRegion(self, float reset_distance)
Optional[RunningTask] getAndTrackRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
Optional[RunningTask] followGpsWaypoints(self, list[GeoPose] gps_poses)
Optional[RunningTask] dockRobotByID(self, str dock_id, bool nav_to_dock=True)
Optional[RunningTask] goToPose(self, PoseStamped pose, str behavior_tree='')
Optional[RunningTask] followObjectByFrame(self, str frame, int max_duration=0)
Optional[RunningTask] followObjectByTopic(self, str topic, int max_duration=0)
None clearCostmapAroundRobot(self, float reset_distance)
Optional[RunningTask] goThroughPoses(self, Goals poses, str behavior_tree='')
None _amclPoseCallback(self, PoseWithCovarianceStamped msg)
Any getFeedback(self, RunningTask task=RunningTask.NONE)
bool isTaskComplete(self, RunningTask task=RunningTask.NONE)
None setInitialPose(self, PoseStamped initial_pose)
Optional[Path] smoothPath(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
SmoothPath.Result _smoothPathImpl(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
Optional[RunningTask] followWaypoints(self, list[PoseStamped] poses)
None clearGlobalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
None clearLocalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
Optional[list[Union[Path, Route]]] getRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None waitUntilNav2Active(self, str navigator='bt_navigator', str localizer='amcl')
ComputeRoute.Result _getRouteImpl(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None setTaskError(self, int error_code, str error_msg)