Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
robot_navigator.py
1 #! /usr/bin/env python3
2 # Copyright 2021 Samsung Research America
3 # Copyright 2025 Open Navigation LLC
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 from enum import Enum
19 import time
20 from typing import Any, Optional, Union
21 
22 from action_msgs.msg import GoalStatus
23 from builtin_interfaces.msg import Duration
24 from geographic_msgs.msg import GeoPose
25 from geometry_msgs.msg import Point, PoseStamped, PoseWithCovarianceStamped
26 from lifecycle_msgs.srv import GetState
27 from nav2_msgs.action import (AssistedTeleop, BackUp, ComputeAndTrackRoute,
28  ComputePathThroughPoses, ComputePathToPose, ComputeRoute, DockRobot,
29  DriveOnHeading, FollowGPSWaypoints, FollowPath, FollowWaypoints,
30  NavigateThroughPoses, NavigateToPose, SmoothPath, Spin, UndockRobot)
31 from nav2_msgs.msg import Route
32 from nav2_msgs.srv import (ClearCostmapAroundRobot, ClearCostmapExceptRegion, ClearEntireCostmap,
33  GetCostmap, LoadMap, ManageLifecycleNodes)
34 from nav_msgs.msg import Goals, OccupancyGrid, Path
35 import rclpy
36 from rclpy.action import ActionClient # type: ignore[attr-defined]
37 from rclpy.action.client import ClientGoalHandle
38 from rclpy.duration import Duration as rclpyDuration
39 from rclpy.node import Node
40 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
41 from rclpy.task import Future
42 from rclpy.type_support import GetResultServiceResponse
43 
44 
45 # Task Result enum for the result of the task being executed
46 class TaskResult(Enum):
47  UNKNOWN = 0
48  SUCCEEDED = 1
49  CANCELED = 2
50  FAILED = 3
51 
52 
53 # Task enum for the task being executed, if its a long-running task to be able to obtain
54 # necessary contextual information in `isTaskComplete` and `getFeedback` regarding the task
55 # which is running.
56 class RunningTask(Enum):
57  NONE = 0
58  NAVIGATE_TO_POSE = 1
59  NAVIGATE_THROUGH_POSES = 2
60  FOLLOW_PATH = 3
61  FOLLOW_WAYPOINTS = 4
62  FOLLOW_GPS_WAYPOINTS = 5
63  SPIN = 6
64  BACKUP = 7
65  DRIVE_ON_HEADING = 8
66  ASSISTED_TELEOP = 9
67  DOCK_ROBOT = 10
68  UNDOCK_ROBOT = 11
69  COMPUTE_AND_TRACK_ROUTE = 12
70 
71 
72 class BasicNavigator(Node):
73 
74  def __init__(self, node_name: str = 'basic_navigator', namespace: str = '') -> None:
75  super().__init__(node_name=node_name, namespace=namespace)
76  self.initial_poseinitial_pose = PoseStamped()
77  self.initial_poseinitial_pose.header.frame_id = 'map'
78 
79  self.goal_handlegoal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None
80  self.result_futureresult_future: \
81  Optional[Future[GetResultServiceResponse[Any]]] = None
82  self.feedbackfeedback: Any = None
83  self.statusstatus: Optional[int] = None
84 
85  # Since the route server's compute and track action server is likely
86  # to be running simultaneously with another (e.g. controller, WPF) server,
87  # we must track its futures and feedback separately. Additionally, the
88  # route tracking feedback is uniquely important to be complete and ordered
89  self.route_goal_handleroute_goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None
90  self.route_result_futureroute_result_future: \
91  Optional[Future[GetResultServiceResponse[Any]]] = None
92  self.route_feedback: list[Any] = []
93 
94  # Error code and messages from servers
95  self.last_action_error_codelast_action_error_code = 0
96  self.last_action_error_msglast_action_error_msg = ''
97 
98  amcl_pose_qos = QoSProfile(
99  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
100  reliability=QoSReliabilityPolicy.RELIABLE,
101  history=QoSHistoryPolicy.KEEP_LAST,
102  depth=1,
103  )
104 
105  self.initial_pose_receivedinitial_pose_received = False
106  self.nav_through_poses_clientnav_through_poses_client = ActionClient(
107  self, NavigateThroughPoses, 'navigate_through_poses'
108  )
109  self.nav_to_pose_clientnav_to_pose_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
110  self.follow_waypoints_clientfollow_waypoints_client = ActionClient(
111  self, FollowWaypoints, 'follow_waypoints'
112  )
113  self.follow_gps_waypoints_clientfollow_gps_waypoints_client = ActionClient(
114  self, FollowGPSWaypoints, 'follow_gps_waypoints'
115  )
116  self.follow_path_clientfollow_path_client = ActionClient(self, FollowPath, 'follow_path')
117  self.compute_path_to_pose_clientcompute_path_to_pose_client = ActionClient(
118  self, ComputePathToPose, 'compute_path_to_pose'
119  )
120  self.compute_path_through_poses_clientcompute_path_through_poses_client = ActionClient(
121  self, ComputePathThroughPoses, 'compute_path_through_poses'
122  )
123  self.smoother_clientsmoother_client = ActionClient(self, SmoothPath, 'smooth_path')
124  self.compute_route_clientcompute_route_client = ActionClient(self, ComputeRoute, 'compute_route')
125  self.compute_and_track_route_clientcompute_and_track_route_client = ActionClient(self, ComputeAndTrackRoute,
126  'compute_and_track_route')
127  self.spin_clientspin_client = ActionClient(self, Spin, 'spin')
128  self.backup_clientbackup_client = ActionClient(self, BackUp, 'backup')
129  self.drive_on_heading_clientdrive_on_heading_client = ActionClient(
130  self, DriveOnHeading, 'drive_on_heading'
131  )
132  self.assisted_teleop_clientassisted_teleop_client = ActionClient(
133  self, AssistedTeleop, 'assisted_teleop'
134  )
135  self.docking_clientdocking_client = ActionClient(self, DockRobot, 'dock_robot')
136  self.undocking_clientundocking_client = ActionClient(self, UndockRobot, 'undock_robot')
137  self.localization_pose_sublocalization_pose_sub = self.create_subscription(
138  PoseWithCovarianceStamped,
139  'amcl_pose',
140  self._amclPoseCallback_amclPoseCallback,
141  amcl_pose_qos,
142  )
143  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
144  PoseWithCovarianceStamped, 'initialpose', 10
145  )
146  self.change_maps_srvchange_maps_srv = self.create_client(LoadMap, 'map_server/load_map')
147  self.clear_costmap_global_srvclear_costmap_global_srv = self.create_client(
148  ClearEntireCostmap, 'global_costmap/clear_entirely_global_costmap'
149  )
150  self.clear_costmap_local_srvclear_costmap_local_srv = self.create_client(
151  ClearEntireCostmap, 'local_costmap/clear_entirely_local_costmap'
152  )
153  self.clear_costmap_except_region_srvclear_costmap_except_region_srv = self.create_client(
154  ClearCostmapExceptRegion, 'local_costmap/clear_costmap_except_region'
155  )
156  self.clear_costmap_around_robot_srvclear_costmap_around_robot_srv = self.create_client(
157  ClearCostmapAroundRobot, 'local_costmap/clear_costmap_around_robot'
158  )
159  self.get_costmap_global_srvget_costmap_global_srv = self.create_client(
160  GetCostmap, 'global_costmap/get_costmap'
161  )
162  self.get_costmap_local_srvget_costmap_local_srv = self.create_client(
163  GetCostmap, 'local_costmap/get_costmap'
164  )
165 
166  def destroyNode(self) -> None:
167  self.destroy_nodedestroy_node()
168 
169  def destroy_node(self) -> None:
170  self.nav_through_poses_clientnav_through_poses_client.destroy()
171  self.nav_to_pose_clientnav_to_pose_client.destroy()
172  self.follow_waypoints_clientfollow_waypoints_client.destroy()
173  self.follow_path_clientfollow_path_client.destroy()
174  self.compute_path_to_pose_clientcompute_path_to_pose_client.destroy()
175  self.compute_path_through_poses_clientcompute_path_through_poses_client.destroy()
176  self.compute_and_track_route_clientcompute_and_track_route_client.destroy()
177  self.compute_route_clientcompute_route_client.destroy()
178  self.smoother_clientsmoother_client.destroy()
179  self.spin_clientspin_client.destroy()
180  self.backup_clientbackup_client.destroy()
181  self.drive_on_heading_clientdrive_on_heading_client.destroy()
182  self.assisted_teleop_clientassisted_teleop_client.destroy()
183  self.follow_gps_waypoints_clientfollow_gps_waypoints_client.destroy()
184  self.docking_clientdocking_client.destroy()
185  self.undocking_clientundocking_client.destroy()
186  super().destroy_node()
187 
188  def setInitialPose(self, initial_pose: PoseStamped) -> None:
189  """Set the initial pose to the localization system."""
190  self.initial_pose_receivedinitial_pose_received = False
191  self.initial_poseinitial_pose = initial_pose
192  self._setInitialPose_setInitialPose()
193 
194  def goThroughPoses(self, poses: Goals, behavior_tree: str = '') -> Optional[RunningTask]:
195  """Send a `NavThroughPoses` action request."""
196  self.clearTaskErrorclearTaskError()
197  self.debugdebug("Waiting for 'NavigateThroughPoses' action server")
198  while not self.nav_through_poses_clientnav_through_poses_client.wait_for_server(timeout_sec=1.0):
199  self.infoinfo("'NavigateThroughPoses' action server not available, waiting...")
200 
201  goal_msg = NavigateThroughPoses.Goal()
202  goal_msg.poses = poses
203  goal_msg.behavior_tree = behavior_tree
204 
205  self.infoinfo(f'Navigating with {len(goal_msg.poses)} goals....')
206  send_goal_future = self.nav_through_poses_clientnav_through_poses_client.send_goal_async(
207  goal_msg, self._feedbackCallback_feedbackCallback
208  )
209  rclpy.spin_until_future_complete(self, send_goal_future)
210  self.goal_handlegoal_handle = send_goal_future.result()
211 
212  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
213  msg = f'NavigateThroughPoses request with {len(poses)} was rejected!'
214  self.setTaskErrorsetTaskError(NavigateThroughPoses.UNKNOWN, msg)
215  self.errorerror(msg)
216  return None
217 
218  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
219  return RunningTask.NAVIGATE_THROUGH_POSES
220 
221  def goToPose(self, pose: PoseStamped, behavior_tree: str = '') -> Optional[RunningTask]:
222  """Send a `NavToPose` action request."""
223  self.clearTaskErrorclearTaskError()
224  self.debugdebug("Waiting for 'NavigateToPose' action server")
225  while not self.nav_to_pose_clientnav_to_pose_client.wait_for_server(timeout_sec=1.0):
226  self.infoinfo("'NavigateToPose' action server not available, waiting...")
227 
228  goal_msg = NavigateToPose.Goal()
229  goal_msg.pose = pose
230  goal_msg.behavior_tree = behavior_tree
231 
232  self.infoinfo(
233  'Navigating to goal: '
234  + str(pose.pose.position.x)
235  + ' '
236  + str(pose.pose.position.y)
237  + '...'
238  )
239  send_goal_future = self.nav_to_pose_clientnav_to_pose_client.send_goal_async(
240  goal_msg, self._feedbackCallback_feedbackCallback
241  )
242  rclpy.spin_until_future_complete(self, send_goal_future)
243  self.goal_handlegoal_handle = send_goal_future.result()
244 
245  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
246  msg = (
247  'NavigateToPose goal to '
248  + str(pose.pose.position.x)
249  + ' '
250  + str(pose.pose.position.y)
251  + ' was rejected!'
252  )
253  self.setTaskErrorsetTaskError(NavigateToPose.UNKNOWN, msg)
254  self.errorerror(msg)
255  return None
256 
257  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
258  return RunningTask.NAVIGATE_TO_POSE
259 
260  def followWaypoints(self, poses: list[PoseStamped]) -> Optional[RunningTask]:
261  """Send a `FollowWaypoints` action request."""
262  self.clearTaskErrorclearTaskError()
263  self.debugdebug("Waiting for 'FollowWaypoints' action server")
264  while not self.follow_waypoints_clientfollow_waypoints_client.wait_for_server(timeout_sec=1.0):
265  self.infoinfo("'FollowWaypoints' action server not available, waiting...")
266 
267  goal_msg = FollowWaypoints.Goal()
268  goal_msg.poses = poses
269 
270  self.infoinfo(f'Following {len(goal_msg.poses)} goals....')
271  send_goal_future = self.follow_waypoints_clientfollow_waypoints_client.send_goal_async(
272  goal_msg, self._feedbackCallback_feedbackCallback
273  )
274  rclpy.spin_until_future_complete(self, send_goal_future)
275  self.goal_handlegoal_handle = send_goal_future.result()
276 
277  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
278  msg = f'Following {len(poses)} waypoints request was rejected!'
279  self.setTaskErrorsetTaskError(FollowWaypoints.UNKNOWN, msg)
280  self.errorerror(msg)
281  return None
282 
283  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
284  return RunningTask.FOLLOW_WAYPOINTS
285 
286  def followGpsWaypoints(self, gps_poses: list[GeoPose]) -> Optional[RunningTask]:
287  """Send a `FollowGPSWaypoints` action request."""
288  self.clearTaskErrorclearTaskError()
289  self.debugdebug("Waiting for 'FollowWaypoints' action server")
290  while not self.follow_gps_waypoints_clientfollow_gps_waypoints_client.wait_for_server(timeout_sec=1.0):
291  self.infoinfo("'FollowWaypoints' action server not available, waiting...")
292 
293  goal_msg = FollowGPSWaypoints.Goal()
294  goal_msg.gps_poses = gps_poses
295 
296  self.infoinfo(f'Following {len(goal_msg.gps_poses)} gps goals....')
297  send_goal_future = self.follow_gps_waypoints_clientfollow_gps_waypoints_client.send_goal_async(
298  goal_msg, self._feedbackCallback_feedbackCallback
299  )
300  rclpy.spin_until_future_complete(self, send_goal_future)
301  self.goal_handlegoal_handle = send_goal_future.result()
302 
303  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
304  msg = f'Following {len(gps_poses)} gps waypoints request was rejected!'
305  self.setTaskErrorsetTaskError(FollowGPSWaypoints.UNKNOWN, msg)
306  self.errorerror(msg)
307  return None
308 
309  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
310  return RunningTask.FOLLOW_GPS_WAYPOINTS
311 
312  def spin(
313  self, spin_dist: float = 1.57, time_allowance: int = 10,
314  disable_collision_checks: bool = False) -> Optional[RunningTask]:
315  self.clearTaskErrorclearTaskError()
316  self.debugdebug("Waiting for 'Spin' action server")
317  while not self.spin_clientspin_client.wait_for_server(timeout_sec=1.0):
318  self.infoinfo("'Spin' action server not available, waiting...")
319  goal_msg = Spin.Goal()
320  goal_msg.target_yaw = spin_dist
321  goal_msg.time_allowance = Duration(sec=time_allowance)
322  goal_msg.disable_collision_checks = disable_collision_checks
323 
324  self.infoinfo(f'Spinning to angle {goal_msg.target_yaw}....')
325  send_goal_future = self.spin_clientspin_client.send_goal_async(
326  goal_msg, self._feedbackCallback_feedbackCallback
327  )
328  rclpy.spin_until_future_complete(self, send_goal_future)
329  self.goal_handlegoal_handle = send_goal_future.result()
330 
331  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
332  msg = 'Spin request was rejected!'
333  self.setTaskErrorsetTaskError(Spin.UNKNOWN, msg)
334  self.errorerror(msg)
335  return None
336 
337  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
338  return RunningTask.SPIN
339 
340  def backup(
341  self, backup_dist: float = 0.15, backup_speed: float = 0.025,
342  time_allowance: int = 10,
343  disable_collision_checks: bool = False) -> Optional[RunningTask]:
344  self.clearTaskErrorclearTaskError()
345  self.debugdebug("Waiting for 'Backup' action server")
346  while not self.backup_clientbackup_client.wait_for_server(timeout_sec=1.0):
347  self.infoinfo("'Backup' action server not available, waiting...")
348  goal_msg = BackUp.Goal()
349  goal_msg.target = Point(x=float(backup_dist))
350  goal_msg.speed = backup_speed
351  goal_msg.time_allowance = Duration(sec=time_allowance)
352  goal_msg.disable_collision_checks = disable_collision_checks
353 
354  self.infoinfo(f'Backing up {goal_msg.target.x} m at {goal_msg.speed} m/s....')
355  send_goal_future = self.backup_clientbackup_client.send_goal_async(
356  goal_msg, self._feedbackCallback_feedbackCallback
357  )
358  rclpy.spin_until_future_complete(self, send_goal_future)
359  self.goal_handlegoal_handle = send_goal_future.result()
360 
361  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
362  msg = 'Backup request was rejected!'
363  self.setTaskErrorsetTaskError(BackUp.UNKNOWN, msg)
364  self.errorerror(msg)
365  return None
366 
367  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
368  return RunningTask.BACKUP
369 
370  def driveOnHeading(
371  self, dist: float = 0.15, speed: float = 0.025,
372  time_allowance: int = 10,
373  disable_collision_checks: bool = False) -> Optional[RunningTask]:
374  self.clearTaskErrorclearTaskError()
375  self.debugdebug("Waiting for 'DriveOnHeading' action server")
376  while not self.backup_clientbackup_client.wait_for_server(timeout_sec=1.0):
377  self.infoinfo("'DriveOnHeading' action server not available, waiting...")
378  goal_msg = DriveOnHeading.Goal()
379  goal_msg.target = Point(x=float(dist))
380  goal_msg.speed = speed
381  goal_msg.time_allowance = Duration(sec=time_allowance)
382  goal_msg.disable_collision_checks = disable_collision_checks
383 
384  self.infoinfo(f'Drive {goal_msg.target.x} m on heading at {goal_msg.speed} m/s....')
385  send_goal_future = self.drive_on_heading_clientdrive_on_heading_client.send_goal_async(
386  goal_msg, self._feedbackCallback_feedbackCallback
387  )
388  rclpy.spin_until_future_complete(self, send_goal_future)
389  self.goal_handlegoal_handle = send_goal_future.result()
390 
391  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
392  msg = 'Drive On Heading request was rejected!'
393  self.setTaskErrorsetTaskError(DriveOnHeading.UNKNOWN, msg)
394  self.errorerror(msg)
395  return None
396 
397  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
398  return RunningTask.DRIVE_ON_HEADING
399 
400  def assistedTeleop(self, time_allowance: int = 30) -> Optional[RunningTask]:
401 
402  self.clearTaskErrorclearTaskError()
403  self.debugdebug("Wanting for 'assisted_teleop' action server")
404 
405  while not self.assisted_teleop_clientassisted_teleop_client.wait_for_server(timeout_sec=1.0):
406  self.infoinfo("'assisted_teleop' action server not available, waiting...")
407  goal_msg = AssistedTeleop.Goal()
408  goal_msg.time_allowance = Duration(sec=time_allowance)
409 
410  self.infoinfo("Running 'assisted_teleop'....")
411  send_goal_future = self.assisted_teleop_clientassisted_teleop_client.send_goal_async(
412  goal_msg, self._feedbackCallback_feedbackCallback
413  )
414  rclpy.spin_until_future_complete(self, send_goal_future)
415  self.goal_handlegoal_handle = send_goal_future.result()
416 
417  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
418  msg = 'Assisted Teleop request was rejected!'
419  self.setTaskErrorsetTaskError(AssistedTeleop.UNKNOWN, msg)
420  self.errorerror(msg)
421  return None
422 
423  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
424  return RunningTask.ASSISTED_TELEOP
425 
426  def followPath(self, path: Path, controller_id: str = '',
427  goal_checker_id: str = '') -> Optional[RunningTask]:
428  self.clearTaskErrorclearTaskError()
429  """Send a `FollowPath` action request."""
430  self.debugdebug("Waiting for 'FollowPath' action server")
431  while not self.follow_path_clientfollow_path_client.wait_for_server(timeout_sec=1.0):
432  self.infoinfo("'FollowPath' action server not available, waiting...")
433 
434  goal_msg = FollowPath.Goal()
435  goal_msg.path = path
436  goal_msg.controller_id = controller_id
437  goal_msg.goal_checker_id = goal_checker_id
438 
439  self.infoinfo('Executing path...')
440  send_goal_future = self.follow_path_clientfollow_path_client.send_goal_async(
441  goal_msg, self._feedbackCallback_feedbackCallback
442  )
443  rclpy.spin_until_future_complete(self, send_goal_future)
444  self.goal_handlegoal_handle = send_goal_future.result()
445 
446  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
447  msg = 'FollowPath goal was rejected!'
448  self.setTaskErrorsetTaskError(FollowPath.UNKNOWN, msg)
449  self.errorerror(msg)
450  return None
451 
452  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
453  return RunningTask.FOLLOW_PATH
454 
455  def dockRobotByPose(self, dock_pose: PoseStamped,
456  dock_type: str = '', nav_to_dock: bool = True) -> Optional[RunningTask]:
457  self.clearTaskErrorclearTaskError()
458  """Send a `DockRobot` action request."""
459  self.infoinfo("Waiting for 'DockRobot' action server")
460  while not self.docking_clientdocking_client.wait_for_server(timeout_sec=1.0):
461  self.infoinfo('"DockRobot" action server not available, waiting...')
462 
463  goal_msg = DockRobot.Goal()
464  goal_msg.use_dock_id = False
465  goal_msg.dock_pose = dock_pose
466  goal_msg.dock_type = dock_type
467  goal_msg.navigate_to_staging_pose = nav_to_dock # if want to navigate before staging
468 
469  self.infoinfo('Docking at pose: ' + str(dock_pose) + '...')
470  send_goal_future = self.docking_clientdocking_client.send_goal_async(goal_msg,
471  self._feedbackCallback_feedbackCallback)
472  rclpy.spin_until_future_complete(self, send_goal_future)
473  self.goal_handlegoal_handle = send_goal_future.result()
474 
475  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
476  msg = 'DockRobot request was rejected!'
477  self.setTaskErrorsetTaskError(DockRobot.UNKNOWN, msg)
478  self.errorerror(msg)
479  return None
480 
481  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
482  return RunningTask.DOCK_ROBOT
483 
484  def dockRobotByID(self, dock_id: str, nav_to_dock: bool = True) -> Optional[RunningTask]:
485  """Send a `DockRobot` action request."""
486  self.clearTaskErrorclearTaskError()
487  self.infoinfo("Waiting for 'DockRobot' action server")
488  while not self.docking_clientdocking_client.wait_for_server(timeout_sec=1.0):
489  self.infoinfo('"DockRobot" action server not available, waiting...')
490 
491  goal_msg = DockRobot.Goal()
492  goal_msg.use_dock_id = True
493  goal_msg.dock_id = dock_id
494  goal_msg.navigate_to_staging_pose = nav_to_dock # if want to navigate before staging
495 
496  self.infoinfo('Docking at dock ID: ' + str(dock_id) + '...')
497  send_goal_future = self.docking_clientdocking_client.send_goal_async(goal_msg,
498  self._feedbackCallback_feedbackCallback)
499  rclpy.spin_until_future_complete(self, send_goal_future)
500  self.goal_handlegoal_handle = send_goal_future.result()
501 
502  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
503  msg = 'DockRobot request was rejected!'
504  self.setTaskErrorsetTaskError(DockRobot.UNKNOWN, msg)
505  self.errorerror(msg)
506  return None
507 
508  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
509  return RunningTask.DOCK_ROBOT
510 
511  def undockRobot(self, dock_type: str = '') -> Optional[RunningTask]:
512  """Send a `UndockRobot` action request."""
513  self.clearTaskErrorclearTaskError()
514  self.infoinfo("Waiting for 'UndockRobot' action server")
515  while not self.undocking_clientundocking_client.wait_for_server(timeout_sec=1.0):
516  self.infoinfo('"UndockRobot" action server not available, waiting...')
517 
518  goal_msg = UndockRobot.Goal()
519  goal_msg.dock_type = dock_type
520 
521  self.infoinfo('Undocking from dock of type: ' + str(dock_type) + '...')
522  send_goal_future = self.undocking_clientundocking_client.send_goal_async(goal_msg,
523  self._feedbackCallback_feedbackCallback)
524  rclpy.spin_until_future_complete(self, send_goal_future)
525  self.goal_handlegoal_handle = send_goal_future.result()
526 
527  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
528  msg = 'UndockRobot request was rejected!'
529  self.setTaskErrorsetTaskError(UndockRobot.UNKNOWN, msg)
530  self.errorerror(msg)
531  return None
532 
533  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
534  return RunningTask.UNDOCK_ROBOT
535 
536  def cancelTask(self) -> None:
537  """Cancel pending task request of any type."""
538  self.infoinfo('Canceling current task.')
539  if self.result_futureresult_future:
540  if self.goal_handlegoal_handle is not None:
541  future = self.goal_handlegoal_handle.cancel_goal_async()
542  rclpy.spin_until_future_complete(self, future)
543  else:
544  self.errorerror('Cancel task failed, goal handle is None')
545  self.setTaskErrorsetTaskError(0, 'Cancel task failed, goal handle is None')
546  return
547  if self.route_result_futureroute_result_future:
548  if self.route_goal_handleroute_goal_handle is not None:
549  future = self.route_goal_handleroute_goal_handle.cancel_goal_async()
550  rclpy.spin_until_future_complete(self, future)
551  else:
552  self.errorerror('Cancel route task failed, goal handle is None')
553  self.setTaskErrorsetTaskError(0, 'Cancel route task failed, goal handle is None')
554  return
555  self.clearTaskErrorclearTaskError()
556  return
557 
558  def isTaskComplete(self, task: RunningTask = RunningTask.NONE) -> bool:
559  """Check if the task request of any type is complete yet."""
560  # Find the result future to spin
561  if task is None:
562  self.errorerror('Task is None, cannot check for completion')
563  return False
564 
565  result_future = None
566  if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
567  result_future = self.result_futureresult_future
568  else:
569  result_future = self.route_result_futureroute_result_future
570  if not result_future:
571  # task was cancelled or completed
572  return True
573 
574  # Get the result of the future, if complete
575  rclpy.spin_until_future_complete(self, result_future, timeout_sec=0.10)
576  result_response = result_future.result()
577 
578  if result_response:
579  self.statusstatus = result_response.status
580  if self.statusstatus != GoalStatus.STATUS_SUCCEEDED:
581  result = result_response.result
582  if result is not None:
583  self.setTaskErrorsetTaskError(result.error_code, result.error_msg)
584  self.debugdebug(
585  'Task with failed with'
586  f' status code:{self.status}'
587  f' error code:{result.error_code}'
588  f' error msg:{result.error_msg}')
589  return True
590  else:
591  self.setTaskErrorsetTaskError(0, 'No result received')
592  self.debugdebug('Task failed with no result received')
593  return True
594  else:
595  # Timed out, still processing, not complete yet
596  return False
597 
598  self.debugdebug('Task succeeded!')
599  return True
600 
601  def getFeedback(self, task: RunningTask = RunningTask.NONE) -> Any:
602  """Get the pending action feedback message."""
603  if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
604  return self.feedbackfeedback
605  if len(self.route_feedback) > 0:
606  return self.route_feedback.pop(0)
607  return None
608 
609  def getResult(self) -> TaskResult:
610  """Get the pending action result message."""
611  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
612  return TaskResult.SUCCEEDED
613  elif self.statusstatus == GoalStatus.STATUS_ABORTED:
614  return TaskResult.FAILED
615  elif self.statusstatus == GoalStatus.STATUS_CANCELED:
616  return TaskResult.CANCELED
617  else:
618  return TaskResult.UNKNOWN
619 
620  def clearTaskError(self) -> None:
621  self.last_action_error_codelast_action_error_code = 0
622  self.last_action_error_msglast_action_error_msg = ''
623 
624  def setTaskError(self, error_code: int, error_msg: str) -> None:
625  self.last_action_error_codelast_action_error_code = error_code
626  self.last_action_error_msglast_action_error_msg = error_msg
627 
628  def getTaskError(self) -> tuple[int, str]:
629  return (self.last_action_error_codelast_action_error_code, self.last_action_error_msglast_action_error_msg)
630 
631  def waitUntilNav2Active(self, navigator: str = 'bt_navigator',
632  localizer: str = 'amcl') -> None:
633  """Block until the full navigation system is up and running."""
634  if localizer != 'robot_localization': # non-lifecycle node
635  self._waitForNodeToActivate_waitForNodeToActivate(localizer)
636  if localizer == 'amcl':
637  self._waitForInitialPose_waitForInitialPose()
638  self._waitForNodeToActivate_waitForNodeToActivate(navigator)
639  self.infoinfo('Nav2 is ready for use!')
640  return
641 
642  def _getPathImpl(
643  self, start: PoseStamped, goal: PoseStamped,
644  planner_id: str = '', use_start: bool = False
645  ) -> ComputePathToPose.Result:
646  """
647  Send a `ComputePathToPose` action request.
648 
649  Internal implementation to get the full result, not just the path.
650  """
651  self.debugdebug("Waiting for 'ComputePathToPose' action server")
652  while not self.compute_path_to_pose_clientcompute_path_to_pose_client.wait_for_server(timeout_sec=1.0):
653  self.infoinfo("'ComputePathToPose' action server not available, waiting...")
654 
655  goal_msg = ComputePathToPose.Goal()
656  goal_msg.start = start
657  goal_msg.goal = goal
658  goal_msg.planner_id = planner_id
659  goal_msg.use_start = use_start
660 
661  self.infoinfo('Getting path...')
662  send_goal_future = self.compute_path_to_pose_clientcompute_path_to_pose_client.send_goal_async(goal_msg)
663  rclpy.spin_until_future_complete(self, send_goal_future)
664  self.goal_handlegoal_handle = send_goal_future.result()
665 
666  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
667  self.errorerror('Get path was rejected!')
668  self.statusstatus = GoalStatus.UNKNOWN
669  result = ComputePathToPose.Result()
670  result.error_code = ComputePathToPose.UNKNOWN
671  result.error_msg = 'Get path was rejected'
672  return result
673 
674  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
675  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
676  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
677 
678  return self.result_futureresult_future.result().result # type: ignore[union-attr]
679 
680  def getPath(
681  self, start: PoseStamped, goal: PoseStamped,
682  planner_id: str = '', use_start: bool = False) -> Path:
683  """Send a `ComputePathToPose` action request."""
684  self.clearTaskErrorclearTaskError()
685  rtn = self._getPathImpl_getPathImpl(start, goal, planner_id, use_start)
686 
687  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
688  return rtn.path
689  else:
690  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
691  self.warnwarn('Getting path failed with'
692  f' status code:{self.status}'
693  f' error code:{rtn.error_code}'
694  f' error msg:{rtn.error_msg}')
695  return None
696 
697  def _getPathThroughPosesImpl(
698  self, start: PoseStamped, goals: Goals,
699  planner_id: str = '', use_start: bool = False
700  ) -> ComputePathThroughPoses.Result:
701  """
702  Send a `ComputePathThroughPoses` action request.
703 
704  Internal implementation to get the full result, not just the path.
705  """
706  self.debugdebug("Waiting for 'ComputePathThroughPoses' action server")
707  while not self.compute_path_through_poses_clientcompute_path_through_poses_client.wait_for_server(
708  timeout_sec=1.0
709  ):
710  self.infoinfo(
711  "'ComputePathThroughPoses' action server not available, waiting..."
712  )
713 
714  goal_msg = ComputePathThroughPoses.Goal()
715  goal_msg.start = start
716  goal_msg.goals.header.frame_id = 'map'
717  goal_msg.goals.header.stamp = self.get_clock().now().to_msg()
718  goal_msg.goals.goals = goals
719  goal_msg.planner_id = planner_id
720  goal_msg.use_start = use_start
721 
722  self.infoinfo('Getting path...')
723  send_goal_future = self.compute_path_through_poses_clientcompute_path_through_poses_client.send_goal_async(
724  goal_msg
725  )
726  rclpy.spin_until_future_complete(self, send_goal_future)
727  self.goal_handlegoal_handle = send_goal_future.result()
728 
729  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
730  self.errorerror('Get path was rejected!')
731  result = ComputePathThroughPoses.Result()
732  result.error_code = ComputePathThroughPoses.UNKNOWN
733  result.error_msg = 'Get path was rejected!'
734  return result
735 
736  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
737  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
738  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
739 
740  return self.result_futureresult_future.result().result # type: ignore[union-attr]
741 
743  self, start: PoseStamped, goals: Goals,
744  planner_id: str = '', use_start: bool = False) -> Path:
745  """Send a `ComputePathThroughPoses` action request."""
746  self.clearTaskErrorclearTaskError()
747  rtn = self._getPathThroughPosesImpl_getPathThroughPosesImpl(start, goals, planner_id, use_start)
748 
749  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
750  return rtn.path
751  else:
752  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
753  self.warnwarn('Getting path failed with'
754  f' status code:{self.status}'
755  f' error code:{rtn.error_code}'
756  f' error msg:{rtn.error_msg}')
757  return None
758 
759  def _getRouteImpl(
760  self, start: Union[int, PoseStamped],
761  goal: Union[int, PoseStamped], use_start: bool = False
762  ) -> ComputeRoute.Result:
763  """
764  Send a `ComputeRoute` action request.
765 
766  Internal implementation to get the full result, not just the sparse route and dense path.
767  """
768  self.debugdebug("Waiting for 'ComputeRoute' action server")
769  while not self.compute_route_clientcompute_route_client.wait_for_server(timeout_sec=1.0):
770  self.infoinfo("'ComputeRoute' action server not available, waiting...")
771 
772  goal_msg = ComputeRoute.Goal()
773  goal_msg.use_start = use_start
774 
775  # Support both ID based requests and PoseStamped based requests
776  if isinstance(start, int) and isinstance(goal, int):
777  goal_msg.start_id = start
778  goal_msg.goal_id = goal
779  goal_msg.use_poses = False
780  elif isinstance(start, PoseStamped) and isinstance(goal, PoseStamped):
781  goal_msg.start = start
782  goal_msg.goal = goal
783  goal_msg.use_poses = True
784  else:
785  self.errorerror('Invalid start and goal types. Must be PoseStamped for pose or int for ID')
786  result = ComputeRoute.Result()
787  result.error_code = ComputeRoute.UNKNOWN
788  result.error_msg = 'Request type fields were invalid!'
789  return result
790 
791  self.infoinfo('Getting route...')
792  send_goal_future = self.compute_route_clientcompute_route_client.send_goal_async(goal_msg)
793  rclpy.spin_until_future_complete(self, send_goal_future)
794  self.goal_handlegoal_handle = send_goal_future.result()
795 
796  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
797  self.errorerror('Get route was rejected!')
798  result = ComputeRoute.Result()
799  result.error_code = ComputeRoute.UNKNOWN
800  result.error_msg = 'Get route was rejected!'
801  return result
802 
803  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
804  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
805  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
806 
807  return self.result_futureresult_future.result().result # type: ignore[union-attr]
808 
809  def getRoute(
810  self, start: Union[int, PoseStamped],
811  goal: Union[int, PoseStamped],
812  use_start: bool = False) -> Optional[list[Union[Path, Route]]]:
813  """Send a `ComputeRoute` action request."""
814  self.clearTaskErrorclearTaskError()
815  rtn = self._getRouteImpl_getRouteImpl(start, goal, use_start=False)
816 
817  if self.statusstatus != GoalStatus.STATUS_SUCCEEDED:
818  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
819  self.warnwarn(
820  'Getting route failed with'
821  f' status code:{self.status}'
822  f' error code:{rtn.error_code}'
823  f' error msg:{rtn.error_msg}')
824  return None
825 
826  return [rtn.path, rtn.route]
827 
829  self, start: Union[int, PoseStamped],
830  goal: Union[int, PoseStamped], use_start: bool = False
831  ) -> Optional[RunningTask]:
832  """Send a `ComputeAndTrackRoute` action request."""
833  self.clearTaskErrorclearTaskError()
834  self.debugdebug("Waiting for 'ComputeAndTrackRoute' action server")
835  while not self.compute_and_track_route_clientcompute_and_track_route_client.wait_for_server(timeout_sec=1.0):
836  self.infoinfo("'ComputeAndTrackRoute' action server not available, waiting...")
837 
838  goal_msg = ComputeAndTrackRoute.Goal()
839  goal_msg.use_start = use_start
840 
841  # Support both ID based requests and PoseStamped based requests
842  if isinstance(start, int) and isinstance(goal, int):
843  goal_msg.start_id = start
844  goal_msg.goal_id = goal
845  goal_msg.use_poses = False
846  elif isinstance(start, PoseStamped) and isinstance(goal, PoseStamped):
847  goal_msg.start = start
848  goal_msg.goal = goal
849  goal_msg.use_poses = True
850  else:
851  self.setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN, 'Request type fields were invalid!')
852  self.errorerror('Invalid start and goal types. Must be PoseStamped for pose or int for ID')
853  return None
854 
855  self.infoinfo('Computing and tracking route...')
856  send_goal_future = self.compute_and_track_route_clientcompute_and_track_route_client.send_goal_async(goal_msg,
857  self._routeFeedbackCallback_routeFeedbackCallback) # noqa: E128
858  rclpy.spin_until_future_complete(self, send_goal_future)
859  self.route_goal_handleroute_goal_handle = send_goal_future.result()
860 
861  if not self.route_goal_handleroute_goal_handle or not self.route_goal_handleroute_goal_handle.accepted:
862  msg = 'Compute and track route was rejected!'
863  self.setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN, msg)
864  self.errorerror(msg)
865  return None
866 
867  self.route_result_futureroute_result_future = self.route_goal_handleroute_goal_handle.get_result_async()
868  return RunningTask.COMPUTE_AND_TRACK_ROUTE
869 
870  def _smoothPathImpl(
871  self, path: Path, smoother_id: str = '',
872  max_duration: float = 2.0, check_for_collision: bool = False
873  ) -> SmoothPath.Result:
874  """
875  Send a `SmoothPath` action request.
876 
877  Internal implementation to get the full result, not just the path.
878  """
879  self.debugdebug("Waiting for 'SmoothPath' action server")
880  while not self.smoother_clientsmoother_client.wait_for_server(timeout_sec=1.0):
881  self.infoinfo("'SmoothPath' action server not available, waiting...")
882 
883  goal_msg = SmoothPath.Goal()
884  goal_msg.path = path
885  goal_msg.max_smoothing_duration = rclpyDuration(seconds=max_duration).to_msg()
886  goal_msg.smoother_id = smoother_id
887  goal_msg.check_for_collisions = check_for_collision
888 
889  self.infoinfo('Smoothing path...')
890  send_goal_future = self.smoother_clientsmoother_client.send_goal_async(goal_msg)
891  rclpy.spin_until_future_complete(self, send_goal_future)
892  self.goal_handlegoal_handle = send_goal_future.result()
893 
894  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
895  self.errorerror('Smooth path was rejected!')
896  result = SmoothPath.Result()
897  result.error_code = SmoothPath.UNKNOWN
898  result.error_msg = 'Smooth path was rejected'
899  return result
900 
901  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
902  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
903  self.statusstatus = self.result_futureresult_future.result().status # type: ignore[union-attr]
904 
905  return self.result_futureresult_future.result().result # type: ignore[union-attr]
906 
908  self, path: Path, smoother_id: str = '',
909  max_duration: float = 2.0, check_for_collision: bool = False) -> Path:
910  """Send a `SmoothPath` action request."""
911  self.clearTaskErrorclearTaskError()
912  rtn = self._smoothPathImpl_smoothPathImpl(path, smoother_id, max_duration, check_for_collision)
913 
914  if self.statusstatus == GoalStatus.STATUS_SUCCEEDED:
915  return rtn.path
916  else:
917  self.setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
918  self.warnwarn('Getting path failed with'
919  f' status code:{self.status}'
920  f' error code:{rtn.error_code}'
921  f' error msg:{rtn.error_msg}')
922  return None
923 
924  def changeMap(self, map_filepath: str) -> bool:
925  """Change the current static map in the map server."""
926  while not self.change_maps_srvchange_maps_srv.wait_for_service(timeout_sec=1.0):
927  self.infoinfo('change map service not available, waiting...')
928  req = LoadMap.Request()
929  req.map_url = map_filepath
930  future = self.change_maps_srvchange_maps_srv.call_async(req)
931  rclpy.spin_until_future_complete(self, future)
932 
933  future_result = future.result()
934  if future_result is None:
935  self.errorerror('Change map request failed!')
936  return False
937 
938  result = future_result.result
939  if result != LoadMap.Response().RESULT_SUCCESS:
940  if result == LoadMap.RESULT_MAP_DOES_NOT_EXIST:
941  reason = 'Map does not exist'
942  elif result == LoadMap.INVALID_MAP_DATA:
943  reason = 'Invalid map data'
944  elif result == LoadMap.INVALID_MAP_METADATA:
945  reason = 'Invalid map metadata'
946  elif result == LoadMap.UNDEFINED_FAILURE:
947  reason = 'Undefined failure'
948  else:
949  reason = 'Unknown'
950  self.setTaskErrorsetTaskError(result, reason)
951  self.errorerror(f'Change map request failed:{reason}!')
952  return False
953  else:
954  self.infoinfo('Change map request was successful!')
955  return True
956 
957  def clearAllCostmaps(self) -> None:
958  """Clear all costmaps."""
959  self.clearLocalCostmapclearLocalCostmap()
960  self.clearGlobalCostmapclearGlobalCostmap()
961  return
962 
963  def clearLocalCostmap(self) -> None:
964  """Clear local costmap."""
965  while not self.clear_costmap_local_srvclear_costmap_local_srv.wait_for_service(timeout_sec=1.0):
966  self.infoinfo('Clear local costmaps service not available, waiting...')
967  req = ClearEntireCostmap.Request()
968  future = self.clear_costmap_local_srvclear_costmap_local_srv.call_async(req)
969  rclpy.spin_until_future_complete(self, future)
970  return
971 
972  def clearGlobalCostmap(self) -> None:
973  """Clear global costmap."""
974  while not self.clear_costmap_global_srvclear_costmap_global_srv.wait_for_service(timeout_sec=1.0):
975  self.infoinfo('Clear global costmaps service not available, waiting...')
976  req = ClearEntireCostmap.Request()
977  future = self.clear_costmap_global_srvclear_costmap_global_srv.call_async(req)
978  rclpy.spin_until_future_complete(self, future)
979  return
980 
981  def clearCostmapExceptRegion(self, reset_distance: float) -> None:
982  """Clear the costmap except for a specified region."""
983  while not self.clear_costmap_except_region_srvclear_costmap_except_region_srv.wait_for_service(timeout_sec=1.0):
984  self.infoinfo('ClearCostmapExceptRegion service not available, waiting...')
985  req = ClearCostmapExceptRegion.Request()
986  req.reset_distance = reset_distance
987  future = self.clear_costmap_except_region_srvclear_costmap_except_region_srv.call_async(req)
988  rclpy.spin_until_future_complete(self, future)
989  return
990 
991  def clearCostmapAroundRobot(self, reset_distance: float) -> None:
992  """Clear the costmap around the robot."""
993  while not self.clear_costmap_around_robot_srvclear_costmap_around_robot_srv.wait_for_service(timeout_sec=1.0):
994  self.infoinfo('ClearCostmapAroundRobot service not available, waiting...')
995  req = ClearCostmapAroundRobot.Request()
996  req.reset_distance = reset_distance
997  future = self.clear_costmap_around_robot_srvclear_costmap_around_robot_srv.call_async(req)
998  rclpy.spin_until_future_complete(self, future)
999  return
1000 
1001  def getGlobalCostmap(self) -> OccupancyGrid:
1002  """Get the global costmap."""
1003  while not self.get_costmap_global_srvget_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1004  self.infoinfo('Get global costmaps service not available, waiting...')
1005  req = GetCostmap.Request()
1006  future = self.get_costmap_global_srvget_costmap_global_srv.call_async(req)
1007  rclpy.spin_until_future_complete(self, future)
1008 
1009  result = future.result()
1010  if result is None:
1011  self.errorerror('Get global costmap request failed!')
1012  return None
1013 
1014  return result.map
1015 
1016  def getLocalCostmap(self) -> OccupancyGrid:
1017  """Get the local costmap."""
1018  while not self.get_costmap_local_srvget_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1019  self.infoinfo('Get local costmaps service not available, waiting...')
1020  req = GetCostmap.Request()
1021  future = self.get_costmap_local_srvget_costmap_local_srv.call_async(req)
1022  rclpy.spin_until_future_complete(self, future)
1023 
1024  result = future.result()
1025 
1026  if result is None:
1027  self.errorerror('Get local costmap request failed!')
1028  return None
1029 
1030  return result.map
1031 
1032  def lifecycleStartup(self) -> None:
1033  """Startup nav2 lifecycle system."""
1034  self.infoinfo('Starting up lifecycle nodes based on lifecycle_manager.')
1035  for srv_name, srv_type in self.get_service_names_and_types():
1036  if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
1037  self.infoinfo(f'Starting up {srv_name}')
1038  mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
1039  while not mgr_client.wait_for_service(timeout_sec=1.0):
1040  self.infoinfo(f'{srv_name} service not available, waiting...')
1041  req = ManageLifecycleNodes.Request()
1042  req.command = ManageLifecycleNodes.Request().STARTUP
1043  future = mgr_client.call_async(req)
1044 
1045  # starting up requires a full map->odom->base_link TF tree
1046  # so if we're not successful, try forwarding the initial pose
1047  while True:
1048  rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
1049  if not future:
1050  self._waitForInitialPose_waitForInitialPose()
1051  else:
1052  break
1053  self.infoinfo('Nav2 is ready for use!')
1054  return
1055 
1056  def lifecycleShutdown(self) -> None:
1057  """Shutdown nav2 lifecycle system."""
1058  self.infoinfo('Shutting down lifecycle nodes based on lifecycle_manager.')
1059  for srv_name, srv_type in self.get_service_names_and_types():
1060  if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
1061  self.infoinfo(f'Shutting down {srv_name}')
1062  mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
1063  while not mgr_client.wait_for_service(timeout_sec=1.0):
1064  self.infoinfo(f'{srv_name} service not available, waiting...')
1065  req = ManageLifecycleNodes.Request()
1066  req.command = ManageLifecycleNodes.Request().SHUTDOWN
1067  future = mgr_client.call_async(req)
1068  rclpy.spin_until_future_complete(self, future)
1069  future.result()
1070  return
1071 
1072  def _waitForNodeToActivate(self, node_name: str) -> None:
1073  # Waits for the node within the tester namespace to become active
1074  self.debugdebug(f'Waiting for {node_name} to become active..')
1075  node_service = f'{node_name}/get_state'
1076  state_client = self.create_client(GetState, node_service)
1077  while not state_client.wait_for_service(timeout_sec=1.0):
1078  self.infoinfo(f'{node_service} service not available, waiting...')
1079 
1080  req = GetState.Request()
1081  state = 'unknown'
1082  while state != 'active':
1083  self.debugdebug(f'Getting {node_name} state...')
1084  future = state_client.call_async(req)
1085  rclpy.spin_until_future_complete(self, future)
1086 
1087  result = future.result()
1088  if result is not None:
1089  state = result.current_state.label
1090  self.debugdebug(f'Result of get_state: {state}')
1091  time.sleep(2)
1092  return
1093 
1094  def _waitForInitialPose(self) -> None:
1095  while not self.initial_pose_receivedinitial_pose_received:
1096  self.infoinfo('Setting initial pose')
1097  self._setInitialPose_setInitialPose()
1098  self.infoinfo('Waiting for amcl_pose to be received')
1099  rclpy.spin_once(self, timeout_sec=1.0)
1100  return
1101 
1102  def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) -> None:
1103  self.debugdebug('Received amcl pose')
1104  self.initial_pose_receivedinitial_pose_received = True
1105  return
1106 
1107  def _feedbackCallback(self, msg: NavigateToPose.Feedback) -> None:
1108  self.debugdebug('Received action feedback message')
1109  self.feedbackfeedback = msg.feedback
1110  return
1111 
1112  def _routeFeedbackCallback(self, msg: ComputeAndTrackRoute.Feedback) -> None:
1113  self.debugdebug('Received route action feedback message')
1114  self.route_feedback.append(msg.feedback)
1115  return
1116 
1117  def _setInitialPose(self) -> None:
1118  msg = PoseWithCovarianceStamped()
1119  msg.pose.pose = self.initial_poseinitial_pose.pose
1120  msg.header.frame_id = self.initial_poseinitial_pose.header.frame_id
1121  msg.header.stamp = self.initial_poseinitial_pose.header.stamp
1122  self.infoinfo('Publishing Initial Pose')
1123  self.initial_pose_pubinitial_pose_pub.publish(msg)
1124  return
1125 
1126  def info(self, msg: str) -> None:
1127  self.get_logger().info(msg)
1128  return
1129 
1130  def warn(self, msg: str) -> None:
1131  self.get_logger().warn(msg)
1132  return
1133 
1134  def error(self, msg: str) -> None:
1135  self.get_logger().error(msg)
1136  return
1137 
1138  def debug(self, msg: str) -> None:
1139  self.get_logger().debug(msg)
1140  return
Optional[RunningTask] undockRobot(self, str dock_type='')
ComputePathToPose.Result _getPathImpl(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
Path smoothPath(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
None clearCostmapExceptRegion(self, float reset_distance)
Optional[RunningTask] getAndTrackRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
Optional[RunningTask] followGpsWaypoints(self, list[GeoPose] gps_poses)
Path getPath(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
ComputePathThroughPoses.Result _getPathThroughPosesImpl(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
Optional[RunningTask] dockRobotByID(self, str dock_id, bool nav_to_dock=True)
Optional[RunningTask] goToPose(self, PoseStamped pose, str behavior_tree='')
None _feedbackCallback(self, NavigateToPose.Feedback msg)
None clearCostmapAroundRobot(self, float reset_distance)
Optional[RunningTask] goThroughPoses(self, Goals poses, str behavior_tree='')
None _amclPoseCallback(self, PoseWithCovarianceStamped msg)
Any getFeedback(self, RunningTask task=RunningTask.NONE)
bool isTaskComplete(self, RunningTask task=RunningTask.NONE)
None setInitialPose(self, PoseStamped initial_pose)
Path getPathThroughPoses(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
SmoothPath.Result _smoothPathImpl(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
Optional[RunningTask] followWaypoints(self, list[PoseStamped] poses)
Optional[list[Union[Path, Route]]] getRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None waitUntilNav2Active(self, str navigator='bt_navigator', str localizer='amcl')
None _routeFeedbackCallback(self, ComputeAndTrackRoute.Feedback msg)
ComputeRoute.Result _getRouteImpl(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None setTaskError(self, int error_code, str error_msg)