20 from typing
import Any, Optional, Union
22 from action_msgs.msg
import GoalStatus
24 from geographic_msgs.msg
import GeoPose
26 from lifecycle_msgs.srv
import GetState
27 from nav2_msgs.action
import (AssistedTeleop, BackUp, ComputeAndTrackRoute,
28 ComputePathThroughPoses, ComputePathToPose, ComputeRoute, DockRobot,
29 DriveOnHeading, FollowGPSWaypoints, FollowPath, FollowWaypoints,
30 NavigateThroughPoses, NavigateToPose, SmoothPath, Spin, UndockRobot)
32 from nav2_msgs.srv
import (ClearCostmapAroundRobot, ClearCostmapExceptRegion, ClearEntireCostmap,
33 GetCostmap, LoadMap, ManageLifecycleNodes)
36 from rclpy.action
import ActionClient
37 from rclpy.action.client
import ClientGoalHandle
38 from rclpy.duration
import Duration
as rclpyDuration
39 from rclpy.node
import Node
40 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
41 from rclpy.task
import Future
42 from rclpy.type_support
import GetResultServiceResponse
59 NAVIGATE_THROUGH_POSES = 2
62 FOLLOW_GPS_WAYPOINTS = 5
69 COMPUTE_AND_TRACK_ROUTE = 12
74 def __init__(self, node_name: str =
'basic_navigator', namespace: str =
'') ->
None:
75 super().__init__(node_name=node_name, namespace=namespace)
79 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[Any, Any, Any]] =
None
81 Optional[Future[GetResultServiceResponse[Any]]] =
None
83 self.
statusstatus: Optional[int] =
None
89 self.
route_goal_handleroute_goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] =
None
91 Optional[Future[GetResultServiceResponse[Any]]] =
None
92 self.route_feedback: list[Any] = []
98 amcl_pose_qos = QoSProfile(
99 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
100 reliability=QoSReliabilityPolicy.RELIABLE,
101 history=QoSHistoryPolicy.KEEP_LAST,
107 self, NavigateThroughPoses,
'navigate_through_poses'
109 self.
nav_to_pose_clientnav_to_pose_client = ActionClient(self, NavigateToPose,
'navigate_to_pose')
111 self, FollowWaypoints,
'follow_waypoints'
114 self, FollowGPSWaypoints,
'follow_gps_waypoints'
116 self.
follow_path_clientfollow_path_client = ActionClient(self, FollowPath,
'follow_path')
118 self, ComputePathToPose,
'compute_path_to_pose'
121 self, ComputePathThroughPoses,
'compute_path_through_poses'
123 self.
smoother_clientsmoother_client = ActionClient(self, SmoothPath,
'smooth_path')
124 self.
compute_route_clientcompute_route_client = ActionClient(self, ComputeRoute,
'compute_route')
126 'compute_and_track_route')
127 self.
spin_clientspin_client = ActionClient(self, Spin,
'spin')
128 self.
backup_clientbackup_client = ActionClient(self, BackUp,
'backup')
130 self, DriveOnHeading,
'drive_on_heading'
133 self, AssistedTeleop,
'assisted_teleop'
135 self.
docking_clientdocking_client = ActionClient(self, DockRobot,
'dock_robot')
136 self.
undocking_clientundocking_client = ActionClient(self, UndockRobot,
'undock_robot')
138 PoseWithCovarianceStamped,
144 PoseWithCovarianceStamped,
'initialpose', 10
146 self.
change_maps_srvchange_maps_srv = self.create_client(LoadMap,
'map_server/load_map')
148 ClearEntireCostmap,
'global_costmap/clear_entirely_global_costmap'
151 ClearEntireCostmap,
'local_costmap/clear_entirely_local_costmap'
154 ClearCostmapExceptRegion,
'local_costmap/clear_costmap_except_region'
157 ClearCostmapAroundRobot,
'local_costmap/clear_costmap_around_robot'
160 GetCostmap,
'global_costmap/get_costmap'
163 GetCostmap,
'local_costmap/get_costmap'
166 def destroyNode(self) -> None:
169 def destroy_node(self) -> None:
186 super().destroy_node()
189 """Set the initial pose to the localization system."""
194 def goThroughPoses(self, poses: Goals, behavior_tree: str =
'') -> Optional[RunningTask]:
195 """Send a `NavThroughPoses` action request."""
197 self.
debugdebug(
"Waiting for 'NavigateThroughPoses' action server")
199 self.
infoinfo(
"'NavigateThroughPoses' action server not available, waiting...")
201 goal_msg = NavigateThroughPoses.Goal()
202 goal_msg.poses = poses
203 goal_msg.behavior_tree = behavior_tree
205 self.
infoinfo(f
'Navigating with {len(goal_msg.poses)} goals....')
209 rclpy.spin_until_future_complete(self, send_goal_future)
210 self.
goal_handlegoal_handle = send_goal_future.result()
213 msg = f
'NavigateThroughPoses request with {len(poses)} was rejected!'
214 self.
setTaskErrorsetTaskError(NavigateThroughPoses.UNKNOWN, msg)
219 return RunningTask.NAVIGATE_THROUGH_POSES
221 def goToPose(self, pose: PoseStamped, behavior_tree: str =
'') -> Optional[RunningTask]:
222 """Send a `NavToPose` action request."""
224 self.
debugdebug(
"Waiting for 'NavigateToPose' action server")
226 self.
infoinfo(
"'NavigateToPose' action server not available, waiting...")
228 goal_msg = NavigateToPose.Goal()
230 goal_msg.behavior_tree = behavior_tree
233 'Navigating to goal: '
234 + str(pose.pose.position.x)
236 + str(pose.pose.position.y)
242 rclpy.spin_until_future_complete(self, send_goal_future)
243 self.
goal_handlegoal_handle = send_goal_future.result()
247 'NavigateToPose goal to '
248 + str(pose.pose.position.x)
250 + str(pose.pose.position.y)
253 self.
setTaskErrorsetTaskError(NavigateToPose.UNKNOWN, msg)
258 return RunningTask.NAVIGATE_TO_POSE
261 """Send a `FollowWaypoints` action request."""
263 self.
debugdebug(
"Waiting for 'FollowWaypoints' action server")
265 self.
infoinfo(
"'FollowWaypoints' action server not available, waiting...")
267 goal_msg = FollowWaypoints.Goal()
268 goal_msg.poses = poses
270 self.
infoinfo(f
'Following {len(goal_msg.poses)} goals....')
274 rclpy.spin_until_future_complete(self, send_goal_future)
275 self.
goal_handlegoal_handle = send_goal_future.result()
278 msg = f
'Following {len(poses)} waypoints request was rejected!'
279 self.
setTaskErrorsetTaskError(FollowWaypoints.UNKNOWN, msg)
284 return RunningTask.FOLLOW_WAYPOINTS
287 """Send a `FollowGPSWaypoints` action request."""
289 self.
debugdebug(
"Waiting for 'FollowWaypoints' action server")
291 self.
infoinfo(
"'FollowWaypoints' action server not available, waiting...")
293 goal_msg = FollowGPSWaypoints.Goal()
294 goal_msg.gps_poses = gps_poses
296 self.
infoinfo(f
'Following {len(goal_msg.gps_poses)} gps goals....')
300 rclpy.spin_until_future_complete(self, send_goal_future)
301 self.
goal_handlegoal_handle = send_goal_future.result()
304 msg = f
'Following {len(gps_poses)} gps waypoints request was rejected!'
305 self.
setTaskErrorsetTaskError(FollowGPSWaypoints.UNKNOWN, msg)
310 return RunningTask.FOLLOW_GPS_WAYPOINTS
313 self, spin_dist: float = 1.57, time_allowance: int = 10,
314 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
316 self.
debugdebug(
"Waiting for 'Spin' action server")
317 while not self.
spin_clientspin_client.wait_for_server(timeout_sec=1.0):
318 self.
infoinfo(
"'Spin' action server not available, waiting...")
319 goal_msg = Spin.Goal()
320 goal_msg.target_yaw = spin_dist
321 goal_msg.time_allowance = Duration(sec=time_allowance)
322 goal_msg.disable_collision_checks = disable_collision_checks
324 self.
infoinfo(f
'Spinning to angle {goal_msg.target_yaw}....')
325 send_goal_future = self.
spin_clientspin_client.send_goal_async(
328 rclpy.spin_until_future_complete(self, send_goal_future)
329 self.
goal_handlegoal_handle = send_goal_future.result()
332 msg =
'Spin request was rejected!'
338 return RunningTask.SPIN
341 self, backup_dist: float = 0.15, backup_speed: float = 0.025,
342 time_allowance: int = 10,
343 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
345 self.
debugdebug(
"Waiting for 'Backup' action server")
346 while not self.
backup_clientbackup_client.wait_for_server(timeout_sec=1.0):
347 self.
infoinfo(
"'Backup' action server not available, waiting...")
348 goal_msg = BackUp.Goal()
349 goal_msg.target = Point(x=float(backup_dist))
350 goal_msg.speed = backup_speed
351 goal_msg.time_allowance = Duration(sec=time_allowance)
352 goal_msg.disable_collision_checks = disable_collision_checks
354 self.
infoinfo(f
'Backing up {goal_msg.target.x} m at {goal_msg.speed} m/s....')
355 send_goal_future = self.
backup_clientbackup_client.send_goal_async(
358 rclpy.spin_until_future_complete(self, send_goal_future)
359 self.
goal_handlegoal_handle = send_goal_future.result()
362 msg =
'Backup request was rejected!'
368 return RunningTask.BACKUP
371 self, dist: float = 0.15, speed: float = 0.025,
372 time_allowance: int = 10,
373 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
375 self.
debugdebug(
"Waiting for 'DriveOnHeading' action server")
376 while not self.
backup_clientbackup_client.wait_for_server(timeout_sec=1.0):
377 self.
infoinfo(
"'DriveOnHeading' action server not available, waiting...")
378 goal_msg = DriveOnHeading.Goal()
379 goal_msg.target = Point(x=float(dist))
380 goal_msg.speed = speed
381 goal_msg.time_allowance = Duration(sec=time_allowance)
382 goal_msg.disable_collision_checks = disable_collision_checks
384 self.
infoinfo(f
'Drive {goal_msg.target.x} m on heading at {goal_msg.speed} m/s....')
388 rclpy.spin_until_future_complete(self, send_goal_future)
389 self.
goal_handlegoal_handle = send_goal_future.result()
392 msg =
'Drive On Heading request was rejected!'
393 self.
setTaskErrorsetTaskError(DriveOnHeading.UNKNOWN, msg)
398 return RunningTask.DRIVE_ON_HEADING
400 def assistedTeleop(self, time_allowance: int = 30) -> Optional[RunningTask]:
403 self.
debugdebug(
"Wanting for 'assisted_teleop' action server")
406 self.
infoinfo(
"'assisted_teleop' action server not available, waiting...")
407 goal_msg = AssistedTeleop.Goal()
408 goal_msg.time_allowance = Duration(sec=time_allowance)
410 self.
infoinfo(
"Running 'assisted_teleop'....")
414 rclpy.spin_until_future_complete(self, send_goal_future)
415 self.
goal_handlegoal_handle = send_goal_future.result()
418 msg =
'Assisted Teleop request was rejected!'
419 self.
setTaskErrorsetTaskError(AssistedTeleop.UNKNOWN, msg)
424 return RunningTask.ASSISTED_TELEOP
426 def followPath(self, path: Path, controller_id: str =
'',
427 goal_checker_id: str =
'') -> Optional[RunningTask]:
429 """Send a `FollowPath` action request."""
430 self.
debugdebug(
"Waiting for 'FollowPath' action server")
432 self.
infoinfo(
"'FollowPath' action server not available, waiting...")
434 goal_msg = FollowPath.Goal()
436 goal_msg.controller_id = controller_id
437 goal_msg.goal_checker_id = goal_checker_id
439 self.
infoinfo(
'Executing path...')
443 rclpy.spin_until_future_complete(self, send_goal_future)
444 self.
goal_handlegoal_handle = send_goal_future.result()
447 msg =
'FollowPath goal was rejected!'
453 return RunningTask.FOLLOW_PATH
455 def dockRobotByPose(self, dock_pose: PoseStamped,
456 dock_type: str =
'', nav_to_dock: bool =
True) -> Optional[RunningTask]:
458 """Send a `DockRobot` action request."""
459 self.
infoinfo(
"Waiting for 'DockRobot' action server")
460 while not self.
docking_clientdocking_client.wait_for_server(timeout_sec=1.0):
461 self.
infoinfo(
'"DockRobot" action server not available, waiting...')
463 goal_msg = DockRobot.Goal()
464 goal_msg.use_dock_id =
False
465 goal_msg.dock_pose = dock_pose
466 goal_msg.dock_type = dock_type
467 goal_msg.navigate_to_staging_pose = nav_to_dock
469 self.
infoinfo(
'Docking at pose: ' + str(dock_pose) +
'...')
470 send_goal_future = self.
docking_clientdocking_client.send_goal_async(goal_msg,
472 rclpy.spin_until_future_complete(self, send_goal_future)
473 self.
goal_handlegoal_handle = send_goal_future.result()
476 msg =
'DockRobot request was rejected!'
482 return RunningTask.DOCK_ROBOT
484 def dockRobotByID(self, dock_id: str, nav_to_dock: bool =
True) -> Optional[RunningTask]:
485 """Send a `DockRobot` action request."""
487 self.
infoinfo(
"Waiting for 'DockRobot' action server")
488 while not self.
docking_clientdocking_client.wait_for_server(timeout_sec=1.0):
489 self.
infoinfo(
'"DockRobot" action server not available, waiting...')
491 goal_msg = DockRobot.Goal()
492 goal_msg.use_dock_id =
True
493 goal_msg.dock_id = dock_id
494 goal_msg.navigate_to_staging_pose = nav_to_dock
496 self.
infoinfo(
'Docking at dock ID: ' + str(dock_id) +
'...')
497 send_goal_future = self.
docking_clientdocking_client.send_goal_async(goal_msg,
499 rclpy.spin_until_future_complete(self, send_goal_future)
500 self.
goal_handlegoal_handle = send_goal_future.result()
503 msg =
'DockRobot request was rejected!'
509 return RunningTask.DOCK_ROBOT
511 def undockRobot(self, dock_type: str =
'') -> Optional[RunningTask]:
512 """Send a `UndockRobot` action request."""
514 self.
infoinfo(
"Waiting for 'UndockRobot' action server")
515 while not self.
undocking_clientundocking_client.wait_for_server(timeout_sec=1.0):
516 self.
infoinfo(
'"UndockRobot" action server not available, waiting...')
518 goal_msg = UndockRobot.Goal()
519 goal_msg.dock_type = dock_type
521 self.
infoinfo(
'Undocking from dock of type: ' + str(dock_type) +
'...')
522 send_goal_future = self.
undocking_clientundocking_client.send_goal_async(goal_msg,
524 rclpy.spin_until_future_complete(self, send_goal_future)
525 self.
goal_handlegoal_handle = send_goal_future.result()
528 msg =
'UndockRobot request was rejected!'
534 return RunningTask.UNDOCK_ROBOT
537 """Cancel pending task request of any type."""
538 self.
infoinfo(
'Canceling current task.')
541 future = self.
goal_handlegoal_handle.cancel_goal_async()
542 rclpy.spin_until_future_complete(self, future)
544 self.
errorerror(
'Cancel task failed, goal handle is None')
545 self.
setTaskErrorsetTaskError(0,
'Cancel task failed, goal handle is None')
550 rclpy.spin_until_future_complete(self, future)
552 self.
errorerror(
'Cancel route task failed, goal handle is None')
553 self.
setTaskErrorsetTaskError(0,
'Cancel route task failed, goal handle is None')
559 """Check if the task request of any type is complete yet."""
562 self.
errorerror(
'Task is None, cannot check for completion')
566 if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
570 if not result_future:
575 rclpy.spin_until_future_complete(self, result_future, timeout_sec=0.10)
576 result_response = result_future.result()
579 self.
statusstatus = result_response.status
580 if self.
statusstatus != GoalStatus.STATUS_SUCCEEDED:
581 result = result_response.result
582 if result
is not None:
583 self.
setTaskErrorsetTaskError(result.error_code, result.error_msg)
585 'Task with failed with'
586 f
' status code:{self.status}'
587 f
' error code:{result.error_code}'
588 f
' error msg:{result.error_msg}')
592 self.
debugdebug(
'Task failed with no result received')
598 self.
debugdebug(
'Task succeeded!')
601 def getFeedback(self, task: RunningTask = RunningTask.NONE) -> Any:
602 """Get the pending action feedback message."""
603 if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
605 if len(self.route_feedback) > 0:
606 return self.route_feedback.pop(0)
610 """Get the pending action result message."""
611 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
612 return TaskResult.SUCCEEDED
613 elif self.
statusstatus == GoalStatus.STATUS_ABORTED:
614 return TaskResult.FAILED
615 elif self.
statusstatus == GoalStatus.STATUS_CANCELED:
616 return TaskResult.CANCELED
618 return TaskResult.UNKNOWN
620 def clearTaskError(self) -> None:
624 def setTaskError(self, error_code: int, error_msg: str) ->
None:
628 def getTaskError(self) -> tuple[int, str]:
632 localizer: str =
'amcl') ->
None:
633 """Block until the full navigation system is up and running."""
634 if localizer !=
'robot_localization':
636 if localizer ==
'amcl':
639 self.
infoinfo(
'Nav2 is ready for use!')
643 self, start: PoseStamped, goal: PoseStamped,
644 planner_id: str =
'', use_start: bool =
False
645 ) -> ComputePathToPose.Result:
647 Send a `ComputePathToPose` action request.
649 Internal implementation to get the full result, not just the path.
651 self.
debugdebug(
"Waiting for 'ComputePathToPose' action server")
653 self.
infoinfo(
"'ComputePathToPose' action server not available, waiting...")
655 goal_msg = ComputePathToPose.Goal()
656 goal_msg.start = start
658 goal_msg.planner_id = planner_id
659 goal_msg.use_start = use_start
661 self.
infoinfo(
'Getting path...')
663 rclpy.spin_until_future_complete(self, send_goal_future)
664 self.
goal_handlegoal_handle = send_goal_future.result()
667 self.
errorerror(
'Get path was rejected!')
668 self.
statusstatus = GoalStatus.UNKNOWN
669 result = ComputePathToPose.Result()
670 result.error_code = ComputePathToPose.UNKNOWN
671 result.error_msg =
'Get path was rejected'
675 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
681 self, start: PoseStamped, goal: PoseStamped,
682 planner_id: str =
'', use_start: bool =
False) -> Path:
683 """Send a `ComputePathToPose` action request."""
685 rtn = self.
_getPathImpl_getPathImpl(start, goal, planner_id, use_start)
687 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
690 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
691 self.
warnwarn(
'Getting path failed with'
692 f
' status code:{self.status}'
693 f
' error code:{rtn.error_code}'
694 f
' error msg:{rtn.error_msg}')
697 def _getPathThroughPosesImpl(
698 self, start: PoseStamped, goals: Goals,
699 planner_id: str =
'', use_start: bool =
False
700 ) -> ComputePathThroughPoses.Result:
702 Send a `ComputePathThroughPoses` action request.
704 Internal implementation to get the full result, not just the path.
706 self.
debugdebug(
"Waiting for 'ComputePathThroughPoses' action server")
711 "'ComputePathThroughPoses' action server not available, waiting..."
714 goal_msg = ComputePathThroughPoses.Goal()
715 goal_msg.start = start
716 goal_msg.goals.header.frame_id =
'map'
717 goal_msg.goals.header.stamp = self.get_clock().now().to_msg()
718 goal_msg.goals.goals = goals
719 goal_msg.planner_id = planner_id
720 goal_msg.use_start = use_start
722 self.
infoinfo(
'Getting path...')
726 rclpy.spin_until_future_complete(self, send_goal_future)
727 self.
goal_handlegoal_handle = send_goal_future.result()
730 self.
errorerror(
'Get path was rejected!')
731 result = ComputePathThroughPoses.Result()
732 result.error_code = ComputePathThroughPoses.UNKNOWN
733 result.error_msg =
'Get path was rejected!'
737 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
743 self, start: PoseStamped, goals: Goals,
744 planner_id: str =
'', use_start: bool =
False) -> Path:
745 """Send a `ComputePathThroughPoses` action request."""
749 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
752 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
753 self.
warnwarn(
'Getting path failed with'
754 f
' status code:{self.status}'
755 f
' error code:{rtn.error_code}'
756 f
' error msg:{rtn.error_msg}')
760 self, start: Union[int, PoseStamped],
761 goal: Union[int, PoseStamped], use_start: bool =
False
762 ) -> ComputeRoute.Result:
764 Send a `ComputeRoute` action request.
766 Internal implementation to get the full result, not just the sparse route and dense path.
768 self.
debugdebug(
"Waiting for 'ComputeRoute' action server")
770 self.
infoinfo(
"'ComputeRoute' action server not available, waiting...")
772 goal_msg = ComputeRoute.Goal()
773 goal_msg.use_start = use_start
776 if isinstance(start, int)
and isinstance(goal, int):
777 goal_msg.start_id = start
778 goal_msg.goal_id = goal
779 goal_msg.use_poses =
False
780 elif isinstance(start, PoseStamped)
and isinstance(goal, PoseStamped):
781 goal_msg.start = start
783 goal_msg.use_poses =
True
785 self.
errorerror(
'Invalid start and goal types. Must be PoseStamped for pose or int for ID')
786 result = ComputeRoute.Result()
787 result.error_code = ComputeRoute.UNKNOWN
788 result.error_msg =
'Request type fields were invalid!'
791 self.
infoinfo(
'Getting route...')
793 rclpy.spin_until_future_complete(self, send_goal_future)
794 self.
goal_handlegoal_handle = send_goal_future.result()
797 self.
errorerror(
'Get route was rejected!')
798 result = ComputeRoute.Result()
799 result.error_code = ComputeRoute.UNKNOWN
800 result.error_msg =
'Get route was rejected!'
804 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
810 self, start: Union[int, PoseStamped],
811 goal: Union[int, PoseStamped],
812 use_start: bool =
False) -> Optional[list[Union[Path, Route]]]:
813 """Send a `ComputeRoute` action request."""
815 rtn = self.
_getRouteImpl_getRouteImpl(start, goal, use_start=
False)
817 if self.
statusstatus != GoalStatus.STATUS_SUCCEEDED:
818 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
820 'Getting route failed with'
821 f
' status code:{self.status}'
822 f
' error code:{rtn.error_code}'
823 f
' error msg:{rtn.error_msg}')
826 return [rtn.path, rtn.route]
829 self, start: Union[int, PoseStamped],
830 goal: Union[int, PoseStamped], use_start: bool =
False
831 ) -> Optional[RunningTask]:
832 """Send a `ComputeAndTrackRoute` action request."""
834 self.
debugdebug(
"Waiting for 'ComputeAndTrackRoute' action server")
836 self.
infoinfo(
"'ComputeAndTrackRoute' action server not available, waiting...")
838 goal_msg = ComputeAndTrackRoute.Goal()
839 goal_msg.use_start = use_start
842 if isinstance(start, int)
and isinstance(goal, int):
843 goal_msg.start_id = start
844 goal_msg.goal_id = goal
845 goal_msg.use_poses =
False
846 elif isinstance(start, PoseStamped)
and isinstance(goal, PoseStamped):
847 goal_msg.start = start
849 goal_msg.use_poses =
True
851 self.
setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN,
'Request type fields were invalid!')
852 self.
errorerror(
'Invalid start and goal types. Must be PoseStamped for pose or int for ID')
855 self.
infoinfo(
'Computing and tracking route...')
858 rclpy.spin_until_future_complete(self, send_goal_future)
862 msg =
'Compute and track route was rejected!'
863 self.
setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN, msg)
868 return RunningTask.COMPUTE_AND_TRACK_ROUTE
871 self, path: Path, smoother_id: str =
'',
872 max_duration: float = 2.0, check_for_collision: bool =
False
873 ) -> SmoothPath.Result:
875 Send a `SmoothPath` action request.
877 Internal implementation to get the full result, not just the path.
879 self.
debugdebug(
"Waiting for 'SmoothPath' action server")
880 while not self.
smoother_clientsmoother_client.wait_for_server(timeout_sec=1.0):
881 self.
infoinfo(
"'SmoothPath' action server not available, waiting...")
883 goal_msg = SmoothPath.Goal()
885 goal_msg.max_smoothing_duration = rclpyDuration(seconds=max_duration).to_msg()
886 goal_msg.smoother_id = smoother_id
887 goal_msg.check_for_collisions = check_for_collision
889 self.
infoinfo(
'Smoothing path...')
890 send_goal_future = self.
smoother_clientsmoother_client.send_goal_async(goal_msg)
891 rclpy.spin_until_future_complete(self, send_goal_future)
892 self.
goal_handlegoal_handle = send_goal_future.result()
895 self.
errorerror(
'Smooth path was rejected!')
896 result = SmoothPath.Result()
897 result.error_code = SmoothPath.UNKNOWN
898 result.error_msg =
'Smooth path was rejected'
902 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
908 self, path: Path, smoother_id: str =
'',
909 max_duration: float = 2.0, check_for_collision: bool =
False) -> Path:
910 """Send a `SmoothPath` action request."""
912 rtn = self.
_smoothPathImpl_smoothPathImpl(path, smoother_id, max_duration, check_for_collision)
914 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
917 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
918 self.
warnwarn(
'Getting path failed with'
919 f
' status code:{self.status}'
920 f
' error code:{rtn.error_code}'
921 f
' error msg:{rtn.error_msg}')
925 """Change the current static map in the map server."""
926 while not self.
change_maps_srvchange_maps_srv.wait_for_service(timeout_sec=1.0):
927 self.
infoinfo(
'change map service not available, waiting...')
928 req = LoadMap.Request()
929 req.map_url = map_filepath
931 rclpy.spin_until_future_complete(self, future)
933 future_result = future.result()
934 if future_result
is None:
935 self.
errorerror(
'Change map request failed!')
938 result = future_result.result
939 if result != LoadMap.Response().RESULT_SUCCESS:
940 if result == LoadMap.RESULT_MAP_DOES_NOT_EXIST:
941 reason =
'Map does not exist'
942 elif result == LoadMap.INVALID_MAP_DATA:
943 reason =
'Invalid map data'
944 elif result == LoadMap.INVALID_MAP_METADATA:
945 reason =
'Invalid map metadata'
946 elif result == LoadMap.UNDEFINED_FAILURE:
947 reason =
'Undefined failure'
951 self.
errorerror(f
'Change map request failed:{reason}!')
954 self.
infoinfo(
'Change map request was successful!')
958 """Clear all costmaps."""
964 """Clear local costmap."""
966 self.
infoinfo(
'Clear local costmaps service not available, waiting...')
967 req = ClearEntireCostmap.Request()
969 rclpy.spin_until_future_complete(self, future)
973 """Clear global costmap."""
975 self.
infoinfo(
'Clear global costmaps service not available, waiting...')
976 req = ClearEntireCostmap.Request()
978 rclpy.spin_until_future_complete(self, future)
982 """Clear the costmap except for a specified region."""
984 self.
infoinfo(
'ClearCostmapExceptRegion service not available, waiting...')
985 req = ClearCostmapExceptRegion.Request()
986 req.reset_distance = reset_distance
988 rclpy.spin_until_future_complete(self, future)
992 """Clear the costmap around the robot."""
994 self.
infoinfo(
'ClearCostmapAroundRobot service not available, waiting...')
995 req = ClearCostmapAroundRobot.Request()
996 req.reset_distance = reset_distance
998 rclpy.spin_until_future_complete(self, future)
1002 """Get the global costmap."""
1004 self.
infoinfo(
'Get global costmaps service not available, waiting...')
1005 req = GetCostmap.Request()
1007 rclpy.spin_until_future_complete(self, future)
1009 result = future.result()
1011 self.
errorerror(
'Get global costmap request failed!')
1017 """Get the local costmap."""
1019 self.
infoinfo(
'Get local costmaps service not available, waiting...')
1020 req = GetCostmap.Request()
1022 rclpy.spin_until_future_complete(self, future)
1024 result = future.result()
1027 self.
errorerror(
'Get local costmap request failed!')
1033 """Startup nav2 lifecycle system."""
1034 self.
infoinfo(
'Starting up lifecycle nodes based on lifecycle_manager.')
1035 for srv_name, srv_type
in self.get_service_names_and_types():
1036 if srv_type[0] ==
'nav2_msgs/srv/ManageLifecycleNodes':
1037 self.
infoinfo(f
'Starting up {srv_name}')
1038 mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
1039 while not mgr_client.wait_for_service(timeout_sec=1.0):
1040 self.
infoinfo(f
'{srv_name} service not available, waiting...')
1041 req = ManageLifecycleNodes.Request()
1042 req.command = ManageLifecycleNodes.Request().STARTUP
1043 future = mgr_client.call_async(req)
1048 rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
1053 self.
infoinfo(
'Nav2 is ready for use!')
1057 """Shutdown nav2 lifecycle system."""
1058 self.
infoinfo(
'Shutting down lifecycle nodes based on lifecycle_manager.')
1059 for srv_name, srv_type
in self.get_service_names_and_types():
1060 if srv_type[0] ==
'nav2_msgs/srv/ManageLifecycleNodes':
1061 self.
infoinfo(f
'Shutting down {srv_name}')
1062 mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
1063 while not mgr_client.wait_for_service(timeout_sec=1.0):
1064 self.
infoinfo(f
'{srv_name} service not available, waiting...')
1065 req = ManageLifecycleNodes.Request()
1066 req.command = ManageLifecycleNodes.Request().SHUTDOWN
1067 future = mgr_client.call_async(req)
1068 rclpy.spin_until_future_complete(self, future)
1072 def _waitForNodeToActivate(self, node_name: str) ->
None:
1074 self.
debugdebug(f
'Waiting for {node_name} to become active..')
1075 node_service = f
'{node_name}/get_state'
1076 state_client = self.create_client(GetState, node_service)
1077 while not state_client.wait_for_service(timeout_sec=1.0):
1078 self.
infoinfo(f
'{node_service} service not available, waiting...')
1080 req = GetState.Request()
1082 while state !=
'active':
1083 self.
debugdebug(f
'Getting {node_name} state...')
1084 future = state_client.call_async(req)
1085 rclpy.spin_until_future_complete(self, future)
1087 result = future.result()
1088 if result
is not None:
1089 state = result.current_state.label
1090 self.
debugdebug(f
'Result of get_state: {state}')
1094 def _waitForInitialPose(self) -> None:
1096 self.
infoinfo(
'Setting initial pose')
1098 self.
infoinfo(
'Waiting for amcl_pose to be received')
1099 rclpy.spin_once(self, timeout_sec=1.0)
1102 def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
1103 self.
debugdebug(
'Received amcl pose')
1107 def _feedbackCallback(self, msg: NavigateToPose.Feedback) ->
None:
1108 self.
debugdebug(
'Received action feedback message')
1109 self.
feedbackfeedback = msg.feedback
1112 def _routeFeedbackCallback(self, msg: ComputeAndTrackRoute.Feedback) ->
None:
1113 self.
debugdebug(
'Received route action feedback message')
1114 self.route_feedback.append(msg.feedback)
1117 def _setInitialPose(self) -> None:
1118 msg = PoseWithCovarianceStamped()
1120 msg.header.frame_id = self.
initial_poseinitial_pose.header.frame_id
1121 msg.header.stamp = self.
initial_poseinitial_pose.header.stamp
1122 self.
infoinfo(
'Publishing Initial Pose')
1126 def info(self, msg: str) ->
None:
1127 self.get_logger().info(msg)
1130 def warn(self, msg: str) ->
None:
1131 self.get_logger().warn(msg)
1134 def error(self, msg: str) ->
None:
1135 self.get_logger().error(msg)
1138 def debug(self, msg: str) ->
None:
1139 self.get_logger().debug(msg)
OccupancyGrid getGlobalCostmap(self)
compute_path_to_pose_client
Optional[RunningTask] undockRobot(self, str dock_type='')
ComputePathToPose.Result _getPathImpl(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
None lifecycleShutdown(self)
Path smoothPath(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
TaskResult getResult(self)
None clearGlobalCostmap(self)
None clearCostmapExceptRegion(self, float reset_distance)
Optional[RunningTask] getAndTrackRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None clearLocalCostmap(self)
Optional[RunningTask] followGpsWaypoints(self, list[GeoPose] gps_poses)
None clearAllCostmaps(self)
Path getPath(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
ComputePathThroughPoses.Result _getPathThroughPosesImpl(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
Optional[RunningTask] dockRobotByID(self, str dock_id, bool nav_to_dock=True)
Optional[RunningTask] goToPose(self, PoseStamped pose, str behavior_tree='')
clear_costmap_except_region_srv
None _feedbackCallback(self, NavigateToPose.Feedback msg)
follow_gps_waypoints_client
clear_costmap_around_robot_srv
None clearCostmapAroundRobot(self, float reset_distance)
bool changeMap(self, str map_filepath)
Optional[RunningTask] goThroughPoses(self, Goals poses, str behavior_tree='')
None _amclPoseCallback(self, PoseWithCovarianceStamped msg)
Any getFeedback(self, RunningTask task=RunningTask.NONE)
bool isTaskComplete(self, RunningTask task=RunningTask.NONE)
None setInitialPose(self, PoseStamped initial_pose)
Path getPathThroughPoses(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
None _waitForNodeToActivate(self, str node_name)
SmoothPath.Result _smoothPathImpl(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
Optional[RunningTask] followWaypoints(self, list[PoseStamped] poses)
None lifecycleStartup(self)
compute_path_through_poses_client
None clearTaskError(self)
None _setInitialPose(self)
compute_and_track_route_client
Optional[list[Union[Path, Route]]] getRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None waitUntilNav2Active(self, str navigator='bt_navigator', str localizer='amcl')
None error(self, str msg)
None _routeFeedbackCallback(self, ComputeAndTrackRoute.Feedback msg)
OccupancyGrid getLocalCostmap(self)
ComputeRoute.Result _getRouteImpl(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None setTaskError(self, int error_code, str error_msg)
None _waitForInitialPose(self)
None debug(self, str msg)