20 from typing
import Any, Optional, Union
22 from action_msgs.msg
import GoalStatus
24 from geographic_msgs.msg
import GeoPose
26 from lifecycle_msgs.srv
import GetState
27 from nav2_msgs.action
import (AssistedTeleop, BackUp,
28 ComputeAndTrackRoute, ComputePathThroughPoses, ComputePathToPose,
29 ComputeRoute, DockRobot, DriveOnHeading, FollowGPSWaypoints,
30 FollowObject, FollowPath, FollowWaypoints, NavigateThroughPoses,
31 NavigateToPose, SmoothPath, Spin, UndockRobot)
33 from nav2_msgs.srv
import (ClearCostmapAroundPose, ClearCostmapAroundRobot,
34 ClearCostmapExceptRegion, ClearEntireCostmap, GetCostmap, LoadMap,
35 ManageLifecycleNodes, Toggle)
38 from rclpy.action
import ActionClient
39 from rclpy.action.client
import ClientGoalHandle
40 from rclpy.client
import Client
41 from rclpy.duration
import Duration
as rclpyDuration
42 from rclpy.node
import Node
43 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
44 from rclpy.task
import Future
45 from rclpy.type_support
import GetResultServiceResponse
62 NAVIGATE_THROUGH_POSES = 2
65 FOLLOW_GPS_WAYPOINTS = 5
72 COMPUTE_AND_TRACK_ROUTE = 12
78 def __init__(self, node_name: str =
'basic_navigator', namespace: str =
'') ->
None:
79 super().__init__(node_name=node_name, namespace=namespace)
83 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[Any, Any, Any]] =
None
85 Optional[Future[GetResultServiceResponse[Any]]] =
None
87 self.
statusstatus: Optional[int] =
None
93 self.
route_goal_handleroute_goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] =
None
95 Optional[Future[GetResultServiceResponse[Any]]] =
None
96 self.route_feedback: list[Any] = []
102 amcl_pose_qos = QoSProfile(
103 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
104 reliability=QoSReliabilityPolicy.RELIABLE,
105 history=QoSHistoryPolicy.KEEP_LAST,
110 self.nav_through_poses_client: ActionClient[
111 NavigateThroughPoses.Goal,
112 NavigateThroughPoses.Result,
113 NavigateThroughPoses.Feedback
115 self, NavigateThroughPoses,
'navigate_through_poses')
116 self.nav_to_pose_client: ActionClient[
118 NavigateToPose.Result,
119 NavigateToPose.Feedback
120 ] = ActionClient(self, NavigateToPose,
'navigate_to_pose')
121 self.follow_waypoints_client: ActionClient[
122 FollowWaypoints.Goal,
123 FollowWaypoints.Result,
124 FollowWaypoints.Feedback
126 self, FollowWaypoints,
'follow_waypoints'
128 self.follow_gps_waypoints_client: ActionClient[
129 FollowGPSWaypoints.Goal,
130 FollowGPSWaypoints.Result,
131 FollowGPSWaypoints.Feedback
133 self, FollowGPSWaypoints,
'follow_gps_waypoints'
135 self.follow_path_client: ActionClient[
139 ] = ActionClient(self, FollowPath,
'follow_path')
140 self.compute_path_to_pose_client: ActionClient[
141 ComputePathToPose.Goal,
142 ComputePathToPose.Result,
143 ComputePathToPose.Feedback
145 self, ComputePathToPose,
'compute_path_to_pose'
147 self.compute_path_through_poses_client: ActionClient[
148 ComputePathThroughPoses.Goal,
149 ComputePathThroughPoses.Result,
150 ComputePathThroughPoses.Feedback
152 self, ComputePathThroughPoses,
'compute_path_through_poses'
154 self.smoother_client: ActionClient[
158 ] = ActionClient(self, SmoothPath,
'smooth_path')
159 self.compute_route_client: ActionClient[
162 ComputeRoute.Feedback
163 ] = ActionClient(self, ComputeRoute,
'compute_route')
164 self.compute_and_track_route_client: ActionClient[
165 ComputeAndTrackRoute.Goal,
166 ComputeAndTrackRoute.Result,
167 ComputeAndTrackRoute.Feedback
168 ] = ActionClient(self, ComputeAndTrackRoute,
'compute_and_track_route')
169 self.spin_client: ActionClient[
173 ] = ActionClient(self, Spin,
'spin')
175 self.backup_client: ActionClient[
179 ] = ActionClient(self, BackUp,
'backup')
180 self.drive_on_heading_client: ActionClient[
182 DriveOnHeading.Result,
183 DriveOnHeading.Feedback
185 self, DriveOnHeading,
'drive_on_heading'
187 self.assisted_teleop_client: ActionClient[
189 AssistedTeleop.Result,
190 AssistedTeleop.Feedback
192 self, AssistedTeleop,
'assisted_teleop'
194 self.docking_client: ActionClient[
198 ] = ActionClient(self, DockRobot,
'dock_robot')
199 self.undocking_client: ActionClient[
203 ] = ActionClient(self, UndockRobot,
'undock_robot')
204 self.following_client: ActionClient[
207 FollowObject.Feedback
208 ] = ActionClient(self, FollowObject,
'follow_object')
211 PoseWithCovarianceStamped,
217 PoseWithCovarianceStamped,
'initialpose', 10
219 self.change_maps_srv: Client[LoadMap.Request, LoadMap.Response] = \
220 self.create_client(LoadMap,
'map_server/load_map')
221 self.clear_costmap_global_srv: Client[
222 ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
224 ClearEntireCostmap,
'global_costmap/clear_entirely_global_costmap'
226 self.clear_costmap_local_srv: Client[
227 ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
229 ClearEntireCostmap,
'local_costmap/clear_entirely_local_costmap'
231 self.clear_costmap_except_region_srv: Client[
232 ClearCostmapExceptRegion.Request, ClearCostmapExceptRegion.Response] = \
234 ClearCostmapExceptRegion,
'local_costmap/clear_costmap_except_region'
236 self.clear_costmap_around_robot_srv: Client[
237 ClearCostmapAroundRobot.Request, ClearCostmapAroundRobot.Response] = \
239 ClearCostmapAroundRobot,
'local_costmap/clear_costmap_around_robot'
241 self.clear_local_costmap_around_pose_srv: Client[
242 ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
244 ClearCostmapAroundPose,
'local_costmap/clear_costmap_around_pose'
246 self.clear_global_costmap_around_pose_srv: Client[
247 ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
249 ClearCostmapAroundPose,
'global_costmap/clear_costmap_around_pose'
251 self.get_costmap_global_srv: Client[
252 GetCostmap.Request, GetCostmap.Response] = \
254 GetCostmap,
'global_costmap/get_costmap'
256 self.get_costmap_local_srv: Client[
257 GetCostmap.Request, GetCostmap.Response] = \
259 GetCostmap,
'local_costmap/get_costmap'
261 self.toggle_collision_monitor_srv: Client[
262 Toggle.Request, Toggle.Response] = \
264 Toggle,
'collision_monitor/toggle'
267 def destroyNode(self) -> None:
270 def destroy_node(self) -> None:
271 self.nav_through_poses_client.destroy()
272 self.nav_to_pose_client.destroy()
273 self.follow_waypoints_client.destroy()
274 self.follow_path_client.destroy()
275 self.compute_path_to_pose_client.destroy()
276 self.compute_path_through_poses_client.destroy()
277 self.compute_and_track_route_client.destroy()
278 self.compute_route_client.destroy()
279 self.smoother_client.destroy()
280 self.spin_client.destroy()
281 self.backup_client.destroy()
282 self.drive_on_heading_client.destroy()
283 self.assisted_teleop_client.destroy()
284 self.follow_gps_waypoints_client.destroy()
285 self.docking_client.destroy()
286 self.undocking_client.destroy()
287 super().destroy_node()
290 """Set the initial pose to the localization system."""
295 def goThroughPoses(self, poses: Goals, behavior_tree: str =
'') -> Optional[RunningTask]:
296 """Send a `NavThroughPoses` action request."""
298 self.
debugdebug(
"Waiting for 'NavigateThroughPoses' action server")
299 while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0):
300 self.
infoinfo(
"'NavigateThroughPoses' action server not available, waiting...")
302 goal_msg = NavigateThroughPoses.Goal()
303 goal_msg.poses = poses
304 goal_msg.behavior_tree = behavior_tree
306 self.
infoinfo(f
'Navigating with {len(poses.goals)} goals....')
307 send_goal_future = self.nav_through_poses_client.send_goal_async(
310 rclpy.spin_until_future_complete(self, send_goal_future)
311 self.
goal_handlegoal_handle = send_goal_future.result()
314 msg = f
'NavigateThroughPoses request with {len(poses.goals)} was rejected!'
315 self.
setTaskErrorsetTaskError(NavigateThroughPoses.Result.UNKNOWN, msg)
320 return RunningTask.NAVIGATE_THROUGH_POSES
322 def goToPose(self, pose: PoseStamped, behavior_tree: str =
'') -> Optional[RunningTask]:
323 """Send a `NavToPose` action request."""
325 self.
debugdebug(
"Waiting for 'NavigateToPose' action server")
326 while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0):
327 self.
infoinfo(
"'NavigateToPose' action server not available, waiting...")
329 goal_msg = NavigateToPose.Goal()
331 goal_msg.behavior_tree = behavior_tree
334 'Navigating to goal: '
335 + str(pose.pose.position.x)
337 + str(pose.pose.position.y)
340 send_goal_future = self.nav_to_pose_client.send_goal_async(
343 rclpy.spin_until_future_complete(self, send_goal_future)
344 self.
goal_handlegoal_handle = send_goal_future.result()
348 'NavigateToPose goal to '
349 + str(pose.pose.position.x)
351 + str(pose.pose.position.y)
354 self.
setTaskErrorsetTaskError(NavigateToPose.Result.UNKNOWN, msg)
359 return RunningTask.NAVIGATE_TO_POSE
362 """Send a `FollowWaypoints` action request."""
364 self.
debugdebug(
"Waiting for 'FollowWaypoints' action server")
365 while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0):
366 self.
infoinfo(
"'FollowWaypoints' action server not available, waiting...")
368 goal_msg = FollowWaypoints.Goal()
369 goal_msg.poses = poses
371 self.
infoinfo(f
'Following {len(goal_msg.poses)} goals....')
372 send_goal_future = self.follow_waypoints_client.send_goal_async(
375 rclpy.spin_until_future_complete(self, send_goal_future)
376 self.
goal_handlegoal_handle = send_goal_future.result()
379 msg = f
'Following {len(poses)} waypoints request was rejected!'
380 self.
setTaskErrorsetTaskError(FollowWaypoints.Result.UNKNOWN, msg)
385 return RunningTask.FOLLOW_WAYPOINTS
388 """Send a `FollowGPSWaypoints` action request."""
390 self.
debugdebug(
"Waiting for 'FollowWaypoints' action server")
391 while not self.follow_gps_waypoints_client.wait_for_server(timeout_sec=1.0):
392 self.
infoinfo(
"'FollowWaypoints' action server not available, waiting...")
394 goal_msg = FollowGPSWaypoints.Goal()
395 goal_msg.gps_poses = gps_poses
397 self.
infoinfo(f
'Following {len(goal_msg.gps_poses)} gps goals....')
398 send_goal_future = self.follow_gps_waypoints_client.send_goal_async(
401 rclpy.spin_until_future_complete(self, send_goal_future)
402 self.
goal_handlegoal_handle = send_goal_future.result()
405 msg = f
'Following {len(gps_poses)} gps waypoints request was rejected!'
406 self.
setTaskErrorsetTaskError(FollowGPSWaypoints.Result.UNKNOWN, msg)
411 return RunningTask.FOLLOW_GPS_WAYPOINTS
414 self, spin_dist: float = 1.57, time_allowance: int = 10,
415 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
417 self.
debugdebug(
"Waiting for 'Spin' action server")
418 while not self.spin_client.wait_for_server(timeout_sec=1.0):
419 self.
infoinfo(
"'Spin' action server not available, waiting...")
420 goal_msg = Spin.Goal()
421 goal_msg.target_yaw = spin_dist
422 goal_msg.time_allowance = Duration(sec=time_allowance)
423 goal_msg.disable_collision_checks = disable_collision_checks
425 self.
infoinfo(f
'Spinning to angle {goal_msg.target_yaw}....')
426 send_goal_future = self.spin_client.send_goal_async(
429 rclpy.spin_until_future_complete(self, send_goal_future)
430 self.
goal_handlegoal_handle = send_goal_future.result()
433 msg =
'Spin request was rejected!'
439 return RunningTask.SPIN
442 self, backup_dist: float = 0.15, backup_speed: float = 0.025,
443 time_allowance: int = 10,
444 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
446 self.
debugdebug(
"Waiting for 'Backup' action server")
447 while not self.backup_client.wait_for_server(timeout_sec=1.0):
448 self.
infoinfo(
"'Backup' action server not available, waiting...")
449 goal_msg = BackUp.Goal()
450 goal_msg.target = Point(x=float(backup_dist))
451 goal_msg.speed = backup_speed
452 goal_msg.time_allowance = Duration(sec=time_allowance)
453 goal_msg.disable_collision_checks = disable_collision_checks
455 self.
infoinfo(f
'Backing up {goal_msg.target.x} m at {goal_msg.speed} m/s....')
456 send_goal_future = self.backup_client.send_goal_async(
459 rclpy.spin_until_future_complete(self, send_goal_future)
460 self.
goal_handlegoal_handle = send_goal_future.result()
463 msg =
'Backup request was rejected!'
464 self.
setTaskErrorsetTaskError(BackUp.Result.UNKNOWN, msg)
469 return RunningTask.BACKUP
472 self, dist: float = 0.15, speed: float = 0.025,
473 time_allowance: int = 10,
474 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
476 self.
debugdebug(
"Waiting for 'DriveOnHeading' action server")
477 while not self.backup_client.wait_for_server(timeout_sec=1.0):
478 self.
infoinfo(
"'DriveOnHeading' action server not available, waiting...")
479 goal_msg = DriveOnHeading.Goal()
480 goal_msg.target = Point(x=float(dist))
481 goal_msg.speed = speed
482 goal_msg.time_allowance = Duration(sec=time_allowance)
483 goal_msg.disable_collision_checks = disable_collision_checks
485 self.
infoinfo(f
'Drive {goal_msg.target.x} m on heading at {goal_msg.speed} m/s....')
486 send_goal_future = self.drive_on_heading_client.send_goal_async(
489 rclpy.spin_until_future_complete(self, send_goal_future)
490 self.
goal_handlegoal_handle = send_goal_future.result()
493 msg =
'Drive On Heading request was rejected!'
494 self.
setTaskErrorsetTaskError(DriveOnHeading.Result.UNKNOWN, msg)
499 return RunningTask.DRIVE_ON_HEADING
501 def assistedTeleop(self, time_allowance: int = 30) -> Optional[RunningTask]:
504 self.
debugdebug(
"Wanting for 'assisted_teleop' action server")
506 while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0):
507 self.
infoinfo(
"'assisted_teleop' action server not available, waiting...")
508 goal_msg = AssistedTeleop.Goal()
509 goal_msg.time_allowance = Duration(sec=time_allowance)
511 self.
infoinfo(
"Running 'assisted_teleop'....")
512 send_goal_future = self.assisted_teleop_client.send_goal_async(
515 rclpy.spin_until_future_complete(self, send_goal_future)
516 self.
goal_handlegoal_handle = send_goal_future.result()
519 msg =
'Assisted Teleop request was rejected!'
520 self.
setTaskErrorsetTaskError(AssistedTeleop.Result.UNKNOWN, msg)
525 return RunningTask.ASSISTED_TELEOP
527 def followPath(self, path: Path, controller_id: str =
'',
528 goal_checker_id: str =
'') -> Optional[RunningTask]:
530 """Send a `FollowPath` action request."""
531 self.
debugdebug(
"Waiting for 'FollowPath' action server")
532 while not self.follow_path_client.wait_for_server(timeout_sec=1.0):
533 self.
infoinfo(
"'FollowPath' action server not available, waiting...")
535 goal_msg = FollowPath.Goal()
537 goal_msg.controller_id = controller_id
538 goal_msg.goal_checker_id = goal_checker_id
540 self.
infoinfo(
'Executing path...')
541 send_goal_future = self.follow_path_client.send_goal_async(
544 rclpy.spin_until_future_complete(self, send_goal_future)
545 self.
goal_handlegoal_handle = send_goal_future.result()
548 msg =
'FollowPath goal was rejected!'
549 self.
setTaskErrorsetTaskError(FollowPath.Result.UNKNOWN, msg)
554 return RunningTask.FOLLOW_PATH
556 def dockRobotByPose(self, dock_pose: PoseStamped,
557 dock_type: str =
'', nav_to_dock: bool =
True) -> Optional[RunningTask]:
559 """Send a `DockRobot` action request."""
560 self.
infoinfo(
"Waiting for 'DockRobot' action server")
561 while not self.docking_client.wait_for_server(timeout_sec=1.0):
562 self.
infoinfo(
'"DockRobot" action server not available, waiting...')
564 goal_msg = DockRobot.Goal()
565 goal_msg.use_dock_id =
False
566 goal_msg.dock_pose = dock_pose
567 goal_msg.dock_type = dock_type
568 goal_msg.navigate_to_staging_pose = nav_to_dock
570 self.
infoinfo(
'Docking at pose: ' + str(dock_pose) +
'...')
571 send_goal_future = self.docking_client.send_goal_async(
573 rclpy.spin_until_future_complete(self, send_goal_future)
574 self.
goal_handlegoal_handle = send_goal_future.result()
577 msg =
'DockRobot request was rejected!'
578 self.
setTaskErrorsetTaskError(DockRobot.Result.UNKNOWN, msg)
583 return RunningTask.DOCK_ROBOT
585 def dockRobotByID(self, dock_id: str, nav_to_dock: bool =
True) -> Optional[RunningTask]:
586 """Send a `DockRobot` action request."""
588 self.
infoinfo(
"Waiting for 'DockRobot' action server")
589 while not self.docking_client.wait_for_server(timeout_sec=1.0):
590 self.
infoinfo(
'"DockRobot" action server not available, waiting...')
592 goal_msg = DockRobot.Goal()
593 goal_msg.use_dock_id =
True
594 goal_msg.dock_id = dock_id
595 goal_msg.navigate_to_staging_pose = nav_to_dock
597 self.
infoinfo(
'Docking at dock ID: ' + str(dock_id) +
'...')
598 send_goal_future = self.docking_client.send_goal_async(
600 rclpy.spin_until_future_complete(self, send_goal_future)
601 self.
goal_handlegoal_handle = send_goal_future.result()
604 msg =
'DockRobot request was rejected!'
605 self.
setTaskErrorsetTaskError(DockRobot.Result.UNKNOWN, msg)
610 return RunningTask.DOCK_ROBOT
612 def undockRobot(self, dock_type: str =
'') -> Optional[RunningTask]:
613 """Send a `UndockRobot` action request."""
615 self.
infoinfo(
"Waiting for 'UndockRobot' action server")
616 while not self.undocking_client.wait_for_server(timeout_sec=1.0):
617 self.
infoinfo(
'"UndockRobot" action server not available, waiting...')
619 goal_msg = UndockRobot.Goal()
620 goal_msg.dock_type = dock_type
622 self.
infoinfo(
'Undocking from dock of type: ' + str(dock_type) +
'...')
623 send_goal_future = self.undocking_client.send_goal_async(
625 rclpy.spin_until_future_complete(self, send_goal_future)
626 self.
goal_handlegoal_handle = send_goal_future.result()
629 msg =
'UndockRobot request was rejected!'
630 self.
setTaskErrorsetTaskError(UndockRobot.Result.UNKNOWN, msg)
635 return RunningTask.UNDOCK_ROBOT
638 """Send a `FollowObject` action request."""
640 self.
infoinfo(
"Waiting for 'FollowObject' action server")
641 while not self.following_client.wait_for_server(timeout_sec=1.0):
642 self.
infoinfo(
'"FollowObject" action server not available, waiting...')
644 goal_msg = FollowObject.Goal()
645 goal_msg.pose_topic = topic
646 goal_msg.max_duration = Duration(sec=max_duration)
648 self.
infoinfo(
'Following object on topic: ' + str(topic) +
'...')
649 send_goal_future = self.following_client.send_goal_async(
651 rclpy.spin_until_future_complete(self, send_goal_future)
652 self.
goal_handlegoal_handle = send_goal_future.result()
655 msg =
'FollowObject request was rejected!'
656 self.
setTaskErrorsetTaskError(FollowObject.Result.UNKNOWN, msg)
661 return RunningTask.FOLLOW_OBJECT
664 """Send a `FollowObject` action request."""
666 self.
infoinfo(
"Waiting for 'FollowObject' action server")
667 while not self.following_client.wait_for_server(timeout_sec=1.0):
668 self.
infoinfo(
'"FollowObject" action server not available, waiting...')
670 goal_msg = FollowObject.Goal()
671 goal_msg.tracked_frame = frame
672 goal_msg.max_duration = Duration(sec=max_duration)
674 self.
infoinfo(
'Following object in frame: ' + str(frame) +
'...')
675 send_goal_future = self.following_client.send_goal_async(
677 rclpy.spin_until_future_complete(self, send_goal_future)
678 self.
goal_handlegoal_handle = send_goal_future.result()
681 msg =
'FollowObject request was rejected!'
682 self.
setTaskErrorsetTaskError(FollowObject.Result.UNKNOWN, msg)
687 return RunningTask.FOLLOW_OBJECT
690 """Cancel pending task request of any type."""
691 self.
infoinfo(
'Canceling current task.')
694 future = self.
goal_handlegoal_handle.cancel_goal_async()
695 rclpy.spin_until_future_complete(self, future)
697 self.
errorerror(
'Cancel task failed, goal handle is None')
698 self.
setTaskErrorsetTaskError(0,
'Cancel task failed, goal handle is None')
703 rclpy.spin_until_future_complete(self, future)
705 self.
errorerror(
'Cancel route task failed, goal handle is None')
706 self.
setTaskErrorsetTaskError(0,
'Cancel route task failed, goal handle is None')
712 """Check if the task request of any type is complete yet."""
715 self.
errorerror(
'Task is None, cannot check for completion')
719 if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
723 if not result_future:
728 rclpy.spin_until_future_complete(self, result_future, timeout_sec=0.10)
729 result_response = result_future.result()
732 self.
statusstatus = result_response.status
733 if self.
statusstatus != GoalStatus.STATUS_SUCCEEDED:
734 result = result_response.result
735 if result
is not None:
736 self.
setTaskErrorsetTaskError(result.error_code, result.error_msg)
738 'Task with failed with'
739 f
' status code:{self.status}'
740 f
' error code:{result.error_code}'
741 f
' error msg:{result.error_msg}')
745 self.
debugdebug(
'Task failed with no result received')
751 self.
debugdebug(
'Task succeeded!')
754 def getFeedback(self, task: RunningTask = RunningTask.NONE) -> Any:
755 """Get the pending action feedback message."""
756 if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
758 if len(self.route_feedback) > 0:
759 return self.route_feedback.pop(0)
763 """Get the pending action result message."""
764 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
765 return TaskResult.SUCCEEDED
766 elif self.
statusstatus == GoalStatus.STATUS_ABORTED:
767 return TaskResult.FAILED
768 elif self.
statusstatus == GoalStatus.STATUS_CANCELED:
769 return TaskResult.CANCELED
771 return TaskResult.UNKNOWN
773 def clearTaskError(self) -> None:
777 def setTaskError(self, error_code: int, error_msg: str) ->
None:
781 def getTaskError(self) -> tuple[int, str]:
785 localizer: str =
'amcl') ->
None:
786 """Block until the full navigation system is up and running."""
787 if localizer !=
'robot_localization':
789 if localizer ==
'amcl':
792 self.
infoinfo(
'Nav2 is ready for use!')
796 self, start: PoseStamped, goal: PoseStamped,
797 planner_id: str =
'', use_start: bool =
False
798 ) -> ComputePathToPose.Result:
800 Send a `ComputePathToPose` action request.
802 Internal implementation to get the full result, not just the path.
804 self.
debugdebug(
"Waiting for 'ComputePathToPose' action server")
805 while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0):
806 self.
infoinfo(
"'ComputePathToPose' action server not available, waiting...")
808 goal_msg = ComputePathToPose.Goal()
809 goal_msg.start = start
811 goal_msg.planner_id = planner_id
812 goal_msg.use_start = use_start
814 self.
infoinfo(
'Getting path...')
815 send_goal_future = self.compute_path_to_pose_client.send_goal_async(goal_msg)
816 rclpy.spin_until_future_complete(self, send_goal_future)
817 self.
goal_handlegoal_handle = send_goal_future.result()
820 self.
errorerror(
'Get path was rejected!')
821 self.
statusstatus = GoalStatus.STATUS_UNKNOWN
822 result = ComputePathToPose.Result()
823 result.error_code = ComputePathToPose.Result.UNKNOWN
824 result.error_msg =
'Get path was rejected'
828 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
834 self, start: PoseStamped, goal: PoseStamped,
835 planner_id: str =
'', use_start: bool =
False) -> Optional[Path]:
836 """Send a `ComputePathToPose` action request."""
838 rtn = self.
_getPathImpl_getPathImpl(start, goal, planner_id, use_start)
840 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
843 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
844 self.
warnwarn(
'Getting path failed with'
845 f
' status code:{self.status}'
846 f
' error code:{rtn.error_code}'
847 f
' error msg:{rtn.error_msg}')
850 def _getPathThroughPosesImpl(
851 self, start: PoseStamped, goals: list[PoseStamped],
852 planner_id: str =
'', use_start: bool =
False
853 ) -> ComputePathThroughPoses.Result:
855 Send a `ComputePathThroughPoses` action request.
857 Internal implementation to get the full result, not just the path.
859 self.
debugdebug(
"Waiting for 'ComputePathThroughPoses' action server")
860 while not self.compute_path_through_poses_client.wait_for_server(
864 "'ComputePathThroughPoses' action server not available, waiting..."
867 goal_msg = ComputePathThroughPoses.Goal()
868 goal_msg.start = start
869 goal_msg.goals.header.frame_id =
'map'
870 goal_msg.goals.header.stamp = self.get_clock().now().to_msg()
871 goal_msg.goals.goals = goals
872 goal_msg.planner_id = planner_id
873 goal_msg.use_start = use_start
875 self.
infoinfo(
'Getting path...')
876 send_goal_future = self.compute_path_through_poses_client.send_goal_async(
879 rclpy.spin_until_future_complete(self, send_goal_future)
880 self.
goal_handlegoal_handle = send_goal_future.result()
883 self.
errorerror(
'Get path was rejected!')
884 result = ComputePathThroughPoses.Result()
885 result.error_code = ComputePathThroughPoses.Result.UNKNOWN
886 result.error_msg =
'Get path was rejected!'
890 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
896 self, start: PoseStamped, goals: list[PoseStamped],
897 planner_id: str =
'', use_start: bool =
False) -> Optional[Path]:
898 """Send a `ComputePathThroughPoses` action request."""
902 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
905 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
906 self.
warnwarn(
'Getting path failed with'
907 f
' status code:{self.status}'
908 f
' error code:{rtn.error_code}'
909 f
' error msg:{rtn.error_msg}')
913 self, start: Union[int, PoseStamped],
914 goal: Union[int, PoseStamped], use_start: bool =
False
915 ) -> ComputeRoute.Result:
917 Send a `ComputeRoute` action request.
919 Internal implementation to get the full result, not just the sparse route and dense path.
921 self.
debugdebug(
"Waiting for 'ComputeRoute' action server")
922 while not self.compute_route_client.wait_for_server(timeout_sec=1.0):
923 self.
infoinfo(
"'ComputeRoute' action server not available, waiting...")
925 goal_msg = ComputeRoute.Goal()
926 goal_msg.use_start = use_start
929 if isinstance(start, int)
and isinstance(goal, int):
930 goal_msg.start_id = start
931 goal_msg.goal_id = goal
932 goal_msg.use_poses =
False
933 elif isinstance(start, PoseStamped)
and isinstance(goal, PoseStamped):
934 goal_msg.start = start
936 goal_msg.use_poses =
True
938 self.
errorerror(
'Invalid start and goal types. Must be PoseStamped for pose or int for ID')
939 result = ComputeRoute.Result()
940 result.error_code = ComputeRoute.Result.UNKNOWN
941 result.error_msg =
'Request type fields were invalid!'
944 self.
infoinfo(
'Getting route...')
945 send_goal_future = self.compute_route_client.send_goal_async(goal_msg)
946 rclpy.spin_until_future_complete(self, send_goal_future)
947 self.
goal_handlegoal_handle = send_goal_future.result()
950 self.
errorerror(
'Get route was rejected!')
951 result = ComputeRoute.Result()
952 result.error_code = ComputeRoute.Result.UNKNOWN
953 result.error_msg =
'Get route was rejected!'
957 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
963 self, start: Union[int, PoseStamped],
964 goal: Union[int, PoseStamped],
965 use_start: bool =
False) -> Optional[list[Union[Path, Route]]]:
966 """Send a `ComputeRoute` action request."""
968 rtn = self.
_getRouteImpl_getRouteImpl(start, goal, use_start=
False)
970 if self.
statusstatus != GoalStatus.STATUS_SUCCEEDED:
971 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
973 'Getting route failed with'
974 f
' status code:{self.status}'
975 f
' error code:{rtn.error_code}'
976 f
' error msg:{rtn.error_msg}')
979 return [rtn.path, rtn.route]
982 self, start: Union[int, PoseStamped],
983 goal: Union[int, PoseStamped], use_start: bool =
False
984 ) -> Optional[RunningTask]:
985 """Send a `ComputeAndTrackRoute` action request."""
987 self.
debugdebug(
"Waiting for 'ComputeAndTrackRoute' action server")
988 while not self.compute_and_track_route_client.wait_for_server(timeout_sec=1.0):
989 self.
infoinfo(
"'ComputeAndTrackRoute' action server not available, waiting...")
991 goal_msg = ComputeAndTrackRoute.Goal()
992 goal_msg.use_start = use_start
995 if isinstance(start, int)
and isinstance(goal, int):
996 goal_msg.start_id = start
997 goal_msg.goal_id = goal
998 goal_msg.use_poses =
False
999 elif isinstance(start, PoseStamped)
and isinstance(goal, PoseStamped):
1000 goal_msg.start = start
1001 goal_msg.goal = goal
1002 goal_msg.use_poses =
True
1004 self.
setTaskErrorsetTaskError(ComputeAndTrackRoute.Result.UNKNOWN,
1005 'Request type fields were invalid!')
1006 self.
errorerror(
'Invalid start and goal types. Must be PoseStamped for pose or int for ID')
1009 self.
infoinfo(
'Computing and tracking route...')
1010 send_goal_future = self.compute_and_track_route_client.send_goal_async(goal_msg,
1012 rclpy.spin_until_future_complete(self, send_goal_future)
1016 msg =
'Compute and track route was rejected!'
1017 self.
setTaskErrorsetTaskError(ComputeAndTrackRoute.Result.UNKNOWN, msg)
1018 self.
errorerror(msg)
1022 return RunningTask.COMPUTE_AND_TRACK_ROUTE
1024 def _smoothPathImpl(
1025 self, path: Path, smoother_id: str =
'',
1026 max_duration: float = 2.0, check_for_collision: bool =
False
1027 ) -> SmoothPath.Result:
1029 Send a `SmoothPath` action request.
1031 Internal implementation to get the full result, not just the path.
1033 self.
debugdebug(
"Waiting for 'SmoothPath' action server")
1034 while not self.smoother_client.wait_for_server(timeout_sec=1.0):
1035 self.
infoinfo(
"'SmoothPath' action server not available, waiting...")
1037 goal_msg = SmoothPath.Goal()
1038 goal_msg.path = path
1039 goal_msg.max_smoothing_duration = rclpyDuration(seconds=max_duration).to_msg()
1040 goal_msg.smoother_id = smoother_id
1041 goal_msg.check_for_collisions = check_for_collision
1043 self.
infoinfo(
'Smoothing path...')
1044 send_goal_future = self.smoother_client.send_goal_async(goal_msg)
1045 rclpy.spin_until_future_complete(self, send_goal_future)
1046 self.
goal_handlegoal_handle = send_goal_future.result()
1049 self.
errorerror(
'Smooth path was rejected!')
1050 result = SmoothPath.Result()
1051 result.error_code = SmoothPath.Result.UNKNOWN
1052 result.error_msg =
'Smooth path was rejected'
1056 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
1062 self, path: Path, smoother_id: str =
'',
1063 max_duration: float = 2.0, check_for_collision: bool =
False) -> Optional[Path]:
1064 """Send a `SmoothPath` action request."""
1066 rtn = self.
_smoothPathImpl_smoothPathImpl(path, smoother_id, max_duration, check_for_collision)
1068 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
1071 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
1072 self.
warnwarn(
'Getting path failed with'
1073 f
' status code:{self.status}'
1074 f
' error code:{rtn.error_code}'
1075 f
' error msg:{rtn.error_msg}')
1079 """Change the current static map in the map server."""
1080 while not self.change_maps_srv.wait_for_service(timeout_sec=1.0):
1081 self.
infoinfo(
'change map service not available, waiting...')
1082 req = LoadMap.Request()
1083 req.map_url = map_filepath
1084 future = self.change_maps_srv.call_async(req)
1085 rclpy.spin_until_future_complete(self, future)
1087 future_result = future.result()
1088 if future_result
is None:
1089 self.
errorerror(
'Change map request failed!')
1092 result = future_result.result
1093 if result != LoadMap.Response.RESULT_SUCCESS:
1094 if result == LoadMap.Response.RESULT_MAP_DOES_NOT_EXIST:
1095 reason =
'Map does not exist'
1096 elif result == LoadMap.Response.RESULT_INVALID_MAP_DATA:
1097 reason =
'Invalid map data'
1098 elif result == LoadMap.Response.RESULT_INVALID_MAP_METADATA:
1099 reason =
'Invalid map metadata'
1100 elif result == LoadMap.Response.RESULT_UNDEFINED_FAILURE:
1101 reason =
'Undefined failure'
1105 self.
errorerror(f
'Change map request failed:{reason}!')
1108 self.
infoinfo(
'Change map request was successful!')
1112 """Clear all costmaps."""
1118 """Clear local costmap."""
1119 while not self.clear_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1120 self.
infoinfo(
'Clear local costmaps service not available, waiting...')
1121 req = ClearEntireCostmap.Request()
1122 future = self.clear_costmap_local_srv.call_async(req)
1123 rclpy.spin_until_future_complete(self, future)
1125 result = future.result()
1127 self.
errorerror(
'Clear local costmap request failed!')
1132 """Clear global costmap."""
1133 while not self.clear_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1134 self.
infoinfo(
'Clear global costmaps service not available, waiting...')
1135 req = ClearEntireCostmap.Request()
1136 future = self.clear_costmap_global_srv.call_async(req)
1137 rclpy.spin_until_future_complete(self, future)
1139 result = future.result()
1141 self.
errorerror(
'Clear global costmap request failed!')
1146 """Clear the costmap except for a specified region."""
1147 while not self.clear_costmap_except_region_srv.wait_for_service(timeout_sec=1.0):
1148 self.
infoinfo(
'ClearCostmapExceptRegion service not available, waiting...')
1149 req = ClearCostmapExceptRegion.Request()
1150 req.reset_distance = reset_distance
1151 future = self.clear_costmap_except_region_srv.call_async(req)
1152 rclpy.spin_until_future_complete(self, future)
1154 result = future.result()
1156 self.
errorerror(
'Clear costmap except region request failed!')
1161 """Clear the costmap around the robot."""
1162 while not self.clear_costmap_around_robot_srv.wait_for_service(timeout_sec=1.0):
1163 self.
infoinfo(
'ClearCostmapAroundRobot service not available, waiting...')
1164 req = ClearCostmapAroundRobot.Request()
1165 req.reset_distance = reset_distance
1166 future = self.clear_costmap_around_robot_srv.call_async(req)
1167 rclpy.spin_until_future_complete(self, future)
1169 result = future.result()
1171 self.
errorerror(
'Clear costmap around robot request failed!')
1176 """Clear the costmap around a given pose."""
1177 while not self.clear_local_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1178 self.
infoinfo(
'ClearLocalCostmapAroundPose service not available, waiting...')
1179 req = ClearCostmapAroundPose.Request()
1181 req.reset_distance = reset_distance
1182 future = self.clear_local_costmap_around_pose_srv.call_async(req)
1183 rclpy.spin_until_future_complete(self, future)
1185 result = future.result()
1187 self.
errorerror(
'Clear local costmap around pose request failed!')
1192 """Clear the global costmap around a given pose."""
1193 while not self.clear_global_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1194 self.
infoinfo(
'ClearGlobalCostmapAroundPose service not available, waiting...')
1195 req = ClearCostmapAroundPose.Request()
1197 req.reset_distance = reset_distance
1198 future = self.clear_global_costmap_around_pose_srv.call_async(req)
1199 rclpy.spin_until_future_complete(self, future)
1201 result = future.result()
1203 self.
errorerror(
'Clear global costmap around pose request failed!')
1208 """Get the global costmap."""
1209 while not self.get_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1210 self.
infoinfo(
'Get global costmaps service not available, waiting...')
1211 req = GetCostmap.Request()
1212 future = self.get_costmap_global_srv.call_async(req)
1213 rclpy.spin_until_future_complete(self, future)
1215 result = future.result()
1217 self.
errorerror(
'Get global costmap request failed!')
1223 """Get the local costmap."""
1224 while not self.get_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1225 self.
infoinfo(
'Get local costmaps service not available, waiting...')
1226 req = GetCostmap.Request()
1227 future = self.get_costmap_local_srv.call_async(req)
1228 rclpy.spin_until_future_complete(self, future)
1230 result = future.result()
1233 self.
errorerror(
'Get local costmap request failed!')
1239 """Toggle the collision monitor."""
1240 while not self.toggle_collision_monitor_srv.wait_for_service(timeout_sec=1.0):
1241 self.
infoinfo(
'Toggle collision monitor service not available, waiting...')
1242 req = Toggle.Request()
1244 future = self.toggle_collision_monitor_srv.call_async(req)
1246 rclpy.spin_until_future_complete(self, future)
1247 result = future.result()
1249 self.
errorerror(
'Toggle collision monitor request failed!')
1254 """Startup nav2 lifecycle system."""
1255 self.
infoinfo(
'Starting up lifecycle nodes based on lifecycle_manager.')
1256 for srv_name, srv_type
in self.get_service_names_and_types():
1257 if srv_type[0] ==
'nav2_msgs/srv/ManageLifecycleNodes':
1258 self.
infoinfo(f
'Starting up {srv_name}')
1259 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1260 self.create_client(ManageLifecycleNodes, srv_name)
1261 while not mgr_client.wait_for_service(timeout_sec=1.0):
1262 self.
infoinfo(f
'{srv_name} service not available, waiting...')
1263 req = ManageLifecycleNodes.Request()
1264 req.command = ManageLifecycleNodes.Request.STARTUP
1265 future = mgr_client.call_async(req)
1270 rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
1275 self.
infoinfo(
'Nav2 is ready for use!')
1279 """Shutdown nav2 lifecycle system."""
1280 self.
infoinfo(
'Shutting down lifecycle nodes based on lifecycle_manager.')
1281 for srv_name, srv_type
in self.get_service_names_and_types():
1282 if srv_type[0] ==
'nav2_msgs/srv/ManageLifecycleNodes':
1283 self.
infoinfo(f
'Shutting down {srv_name}')
1284 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1285 self.create_client(ManageLifecycleNodes, srv_name)
1286 while not mgr_client.wait_for_service(timeout_sec=1.0):
1287 self.
infoinfo(f
'{srv_name} service not available, waiting...')
1288 req = ManageLifecycleNodes.Request()
1289 req.command = ManageLifecycleNodes.Request.SHUTDOWN
1290 future = mgr_client.call_async(req)
1291 rclpy.spin_until_future_complete(self, future)
1295 def _waitForNodeToActivate(self, node_name: str) ->
None:
1297 self.
debugdebug(f
'Waiting for {node_name} to become active..')
1298 node_service = f
'{node_name}/get_state'
1299 state_client: Client[GetState.Request, GetState.Response] = \
1300 self.create_client(GetState, node_service)
1301 while not state_client.wait_for_service(timeout_sec=1.0):
1302 self.
infoinfo(f
'{node_service} service not available, waiting...')
1304 req = GetState.Request()
1306 while state !=
'active':
1307 self.
debugdebug(f
'Getting {node_name} state...')
1308 future = state_client.call_async(req)
1309 rclpy.spin_until_future_complete(self, future)
1311 result = future.result()
1312 if result
is not None:
1313 state = result.current_state.label
1314 self.
debugdebug(f
'Result of get_state: {state}')
1318 def _waitForInitialPose(self) -> None:
1320 self.
infoinfo(
'Setting initial pose')
1322 self.
infoinfo(
'Waiting for amcl_pose to be received')
1323 rclpy.spin_once(self, timeout_sec=1.0)
1326 def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
1327 self.
debugdebug(
'Received amcl pose')
1331 def _feedbackCallback(self, msg: Any) ->
None:
1332 self.
debugdebug(
'Received action feedback message')
1333 self.
feedbackfeedback = msg.feedback
1336 def _routeFeedbackCallback(
1337 self, msg: ComputeAndTrackRoute.Impl.FeedbackMessage) ->
None:
1338 self.
debugdebug(
'Received route action feedback message')
1339 self.route_feedback.append(msg.feedback)
1342 def _setInitialPose(self) -> None:
1343 msg = PoseWithCovarianceStamped()
1345 msg.header.frame_id = self.
initial_poseinitial_pose.header.frame_id
1346 msg.header.stamp = self.
initial_poseinitial_pose.header.stamp
1347 self.
infoinfo(
'Publishing Initial Pose')
1351 def info(self, msg: str) ->
None:
1352 self.get_logger().info(msg)
1355 def warn(self, msg: str) ->
None:
1356 self.get_logger().warning(msg)
1359 def error(self, msg: str) ->
None:
1360 self.get_logger().error(msg)
1363 def debug(self, msg: str) ->
None:
1364 self.get_logger().debug(msg)
Optional[Path] getPathThroughPoses(self, PoseStamped start, list[PoseStamped] goals, str planner_id='', bool use_start=False)
Optional[RunningTask] undockRobot(self, str dock_type='')
ComputePathThroughPoses.Result _getPathThroughPosesImpl(self, PoseStamped start, list[PoseStamped] goals, str planner_id='', bool use_start=False)
ComputePathToPose.Result _getPathImpl(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
None lifecycleShutdown(self)
TaskResult getResult(self)
None clearGlobalCostmap(self)
Optional[Path] getPath(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
None _routeFeedbackCallback(self, ComputeAndTrackRoute.Impl.FeedbackMessage msg)
None clearCostmapExceptRegion(self, float reset_distance)
Optional[RunningTask] getAndTrackRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None clearLocalCostmap(self)
Optional[RunningTask] followGpsWaypoints(self, list[GeoPose] gps_poses)
None clearAllCostmaps(self)
Optional[Costmap] getGlobalCostmap(self)
Optional[RunningTask] dockRobotByID(self, str dock_id, bool nav_to_dock=True)
Optional[RunningTask] goToPose(self, PoseStamped pose, str behavior_tree='')
None _feedbackCallback(self, Any msg)
Optional[RunningTask] followObjectByFrame(self, str frame, int max_duration=0)
Optional[RunningTask] followObjectByTopic(self, str topic, int max_duration=0)
None clearCostmapAroundRobot(self, float reset_distance)
bool changeMap(self, str map_filepath)
Optional[RunningTask] goThroughPoses(self, Goals poses, str behavior_tree='')
None _amclPoseCallback(self, PoseWithCovarianceStamped msg)
Any getFeedback(self, RunningTask task=RunningTask.NONE)
bool isTaskComplete(self, RunningTask task=RunningTask.NONE)
None setInitialPose(self, PoseStamped initial_pose)
Optional[Costmap] getLocalCostmap(self)
Optional[Path] smoothPath(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
None _waitForNodeToActivate(self, str node_name)
SmoothPath.Result _smoothPathImpl(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
Optional[RunningTask] followWaypoints(self, list[PoseStamped] poses)
None clearGlobalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
None lifecycleStartup(self)
None clearLocalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
None clearTaskError(self)
None _setInitialPose(self)
None toggleCollisionMonitor(self, bool enable)
Optional[list[Union[Path, Route]]] getRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None waitUntilNav2Active(self, str navigator='bt_navigator', str localizer='amcl')
None error(self, str msg)
ComputeRoute.Result _getRouteImpl(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None setTaskError(self, int error_code, str error_msg)
None _waitForInitialPose(self)
None debug(self, str msg)