20 from typing
import Any, Optional, Union
22 from action_msgs.msg
import GoalStatus
24 from geographic_msgs.msg
import GeoPose
26 from lifecycle_msgs.srv
import GetState
27 from nav2_msgs.action
import (AssistedTeleop, BackUp, ComputeAndTrackRoute,
28 ComputePathThroughPoses, ComputePathToPose, ComputeRoute, DockRobot,
29 DriveOnHeading, FollowGPSWaypoints, FollowPath, FollowWaypoints,
30 NavigateThroughPoses, NavigateToPose, SmoothPath, Spin, UndockRobot)
32 from nav2_msgs.srv
import (ClearCostmapAroundPose, ClearCostmapAroundRobot,
33 ClearCostmapExceptRegion, ClearEntireCostmap, GetCostmap, LoadMap,
37 from rclpy.action
import ActionClient
38 from rclpy.action.client
import ClientGoalHandle
39 from rclpy.client
import Client
40 from rclpy.duration
import Duration
as rclpyDuration
41 from rclpy.node
import Node
42 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
43 from rclpy.task
import Future
44 from rclpy.type_support
import GetResultServiceResponse
61 NAVIGATE_THROUGH_POSES = 2
64 FOLLOW_GPS_WAYPOINTS = 5
71 COMPUTE_AND_TRACK_ROUTE = 12
76 def __init__(self, node_name: str =
'basic_navigator', namespace: str =
'') ->
None:
77 super().__init__(node_name=node_name, namespace=namespace)
81 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[Any, Any, Any]] =
None
83 Optional[Future[GetResultServiceResponse[Any]]] =
None
85 self.
statusstatus: Optional[int] =
None
91 self.
route_goal_handleroute_goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] =
None
93 Optional[Future[GetResultServiceResponse[Any]]] =
None
94 self.route_feedback: list[Any] = []
100 amcl_pose_qos = QoSProfile(
101 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
102 reliability=QoSReliabilityPolicy.RELIABLE,
103 history=QoSHistoryPolicy.KEEP_LAST,
108 self.nav_through_poses_client: ActionClient[
109 NavigateThroughPoses.Goal,
110 NavigateThroughPoses.Result,
111 NavigateThroughPoses.Feedback
113 self, NavigateThroughPoses,
'navigate_through_poses')
114 self.nav_to_pose_client: ActionClient[
116 NavigateToPose.Result,
117 NavigateToPose.Feedback
118 ] = ActionClient(self, NavigateToPose,
'navigate_to_pose')
119 self.follow_waypoints_client: ActionClient[
120 FollowWaypoints.Goal,
121 FollowWaypoints.Result,
122 FollowWaypoints.Feedback
124 self, FollowWaypoints,
'follow_waypoints'
126 self.follow_gps_waypoints_client: ActionClient[
127 FollowGPSWaypoints.Goal,
128 FollowGPSWaypoints.Result,
129 FollowGPSWaypoints.Feedback
131 self, FollowGPSWaypoints,
'follow_gps_waypoints'
133 self.follow_path_client: ActionClient[
137 ] = ActionClient(self, FollowPath,
'follow_path')
138 self.compute_path_to_pose_client: ActionClient[
139 ComputePathToPose.Goal,
140 ComputePathToPose.Result,
141 ComputePathToPose.Feedback
143 self, ComputePathToPose,
'compute_path_to_pose'
145 self.compute_path_through_poses_client: ActionClient[
146 ComputePathThroughPoses.Goal,
147 ComputePathThroughPoses.Result,
148 ComputePathThroughPoses.Feedback
150 self, ComputePathThroughPoses,
'compute_path_through_poses'
152 self.smoother_client: ActionClient[
156 ] = ActionClient(self, SmoothPath,
'smooth_path')
157 self.compute_route_client: ActionClient[
160 ComputeRoute.Feedback
161 ] = ActionClient(self, ComputeRoute,
'compute_route')
162 self.compute_and_track_route_client: ActionClient[
163 ComputeAndTrackRoute.Goal,
164 ComputeAndTrackRoute.Result,
165 ComputeAndTrackRoute.Feedback
166 ] = ActionClient(self, ComputeAndTrackRoute,
'compute_and_track_route')
167 self.spin_client: ActionClient[
171 ] = ActionClient(self, Spin,
'spin')
173 self.backup_client: ActionClient[
177 ] = ActionClient(self, BackUp,
'backup')
178 self.drive_on_heading_client: ActionClient[
180 DriveOnHeading.Result,
181 DriveOnHeading.Feedback
183 self, DriveOnHeading,
'drive_on_heading'
185 self.assisted_teleop_client: ActionClient[
187 AssistedTeleop.Result,
188 AssistedTeleop.Feedback
190 self, AssistedTeleop,
'assisted_teleop'
192 self.docking_client: ActionClient[
196 ] = ActionClient(self, DockRobot,
'dock_robot')
197 self.undocking_client: ActionClient[
201 ] = ActionClient(self, UndockRobot,
'undock_robot')
204 PoseWithCovarianceStamped,
210 PoseWithCovarianceStamped,
'initialpose', 10
212 self.change_maps_srv: Client[LoadMap.Request, LoadMap.Response] = \
213 self.create_client(LoadMap,
'map_server/load_map')
214 self.clear_costmap_global_srv: Client[
215 ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
217 ClearEntireCostmap,
'global_costmap/clear_entirely_global_costmap'
219 self.clear_costmap_local_srv: Client[
220 ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
222 ClearEntireCostmap,
'local_costmap/clear_entirely_local_costmap'
224 self.clear_costmap_except_region_srv: Client[
225 ClearCostmapExceptRegion.Request, ClearCostmapExceptRegion.Response] = \
227 ClearCostmapExceptRegion,
'local_costmap/clear_costmap_except_region'
229 self.clear_costmap_around_robot_srv: Client[
230 ClearCostmapAroundRobot.Request, ClearCostmapAroundRobot.Response] = \
232 ClearCostmapAroundRobot,
'local_costmap/clear_costmap_around_robot'
234 self.clear_local_costmap_around_pose_srv: Client[
235 ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
237 ClearCostmapAroundPose,
'local_costmap/clear_costmap_around_pose'
239 self.clear_global_costmap_around_pose_srv: Client[
240 ClearCostmapAroundPose.Request, ClearCostmapAroundPose.Response] = \
242 ClearCostmapAroundPose,
'global_costmap/clear_costmap_around_pose'
244 self.get_costmap_global_srv: Client[
245 GetCostmap.Request, GetCostmap.Response] = \
247 GetCostmap,
'global_costmap/get_costmap'
249 self.get_costmap_local_srv: Client[
250 GetCostmap.Request, GetCostmap.Response] = \
252 GetCostmap,
'local_costmap/get_costmap'
255 def destroyNode(self) -> None:
258 def destroy_node(self) -> None:
259 self.nav_through_poses_client.destroy()
260 self.nav_to_pose_client.destroy()
261 self.follow_waypoints_client.destroy()
262 self.follow_path_client.destroy()
263 self.compute_path_to_pose_client.destroy()
264 self.compute_path_through_poses_client.destroy()
265 self.compute_and_track_route_client.destroy()
266 self.compute_route_client.destroy()
267 self.smoother_client.destroy()
268 self.spin_client.destroy()
269 self.backup_client.destroy()
270 self.drive_on_heading_client.destroy()
271 self.assisted_teleop_client.destroy()
272 self.follow_gps_waypoints_client.destroy()
273 self.docking_client.destroy()
274 self.undocking_client.destroy()
275 super().destroy_node()
278 """Set the initial pose to the localization system."""
283 def goThroughPoses(self, poses: Goals, behavior_tree: str =
'') -> Optional[RunningTask]:
284 """Send a `NavThroughPoses` action request."""
286 self.
debugdebug(
"Waiting for 'NavigateThroughPoses' action server")
287 while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0):
288 self.
infoinfo(
"'NavigateThroughPoses' action server not available, waiting...")
290 goal_msg = NavigateThroughPoses.Goal()
291 goal_msg.poses = poses
292 goal_msg.behavior_tree = behavior_tree
294 self.
infoinfo(f
'Navigating with {len(goal_msg.poses)} goals....')
295 send_goal_future = self.nav_through_poses_client.send_goal_async(
298 rclpy.spin_until_future_complete(self, send_goal_future)
299 self.
goal_handlegoal_handle = send_goal_future.result()
302 msg = f
'NavigateThroughPoses request with {len(poses)} was rejected!'
303 self.
setTaskErrorsetTaskError(NavigateThroughPoses.UNKNOWN, msg)
308 return RunningTask.NAVIGATE_THROUGH_POSES
310 def goToPose(self, pose: PoseStamped, behavior_tree: str =
'') -> Optional[RunningTask]:
311 """Send a `NavToPose` action request."""
313 self.
debugdebug(
"Waiting for 'NavigateToPose' action server")
314 while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0):
315 self.
infoinfo(
"'NavigateToPose' action server not available, waiting...")
317 goal_msg = NavigateToPose.Goal()
319 goal_msg.behavior_tree = behavior_tree
322 'Navigating to goal: '
323 + str(pose.pose.position.x)
325 + str(pose.pose.position.y)
328 send_goal_future = self.nav_to_pose_client.send_goal_async(
331 rclpy.spin_until_future_complete(self, send_goal_future)
332 self.
goal_handlegoal_handle = send_goal_future.result()
336 'NavigateToPose goal to '
337 + str(pose.pose.position.x)
339 + str(pose.pose.position.y)
342 self.
setTaskErrorsetTaskError(NavigateToPose.UNKNOWN, msg)
347 return RunningTask.NAVIGATE_TO_POSE
350 """Send a `FollowWaypoints` action request."""
352 self.
debugdebug(
"Waiting for 'FollowWaypoints' action server")
353 while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0):
354 self.
infoinfo(
"'FollowWaypoints' action server not available, waiting...")
356 goal_msg = FollowWaypoints.Goal()
357 goal_msg.poses = poses
359 self.
infoinfo(f
'Following {len(goal_msg.poses)} goals....')
360 send_goal_future = self.follow_waypoints_client.send_goal_async(
363 rclpy.spin_until_future_complete(self, send_goal_future)
364 self.
goal_handlegoal_handle = send_goal_future.result()
367 msg = f
'Following {len(poses)} waypoints request was rejected!'
368 self.
setTaskErrorsetTaskError(FollowWaypoints.UNKNOWN, msg)
373 return RunningTask.FOLLOW_WAYPOINTS
376 """Send a `FollowGPSWaypoints` action request."""
378 self.
debugdebug(
"Waiting for 'FollowWaypoints' action server")
379 while not self.follow_gps_waypoints_client.wait_for_server(timeout_sec=1.0):
380 self.
infoinfo(
"'FollowWaypoints' action server not available, waiting...")
382 goal_msg = FollowGPSWaypoints.Goal()
383 goal_msg.gps_poses = gps_poses
385 self.
infoinfo(f
'Following {len(goal_msg.gps_poses)} gps goals....')
386 send_goal_future = self.follow_gps_waypoints_client.send_goal_async(
389 rclpy.spin_until_future_complete(self, send_goal_future)
390 self.
goal_handlegoal_handle = send_goal_future.result()
393 msg = f
'Following {len(gps_poses)} gps waypoints request was rejected!'
394 self.
setTaskErrorsetTaskError(FollowGPSWaypoints.UNKNOWN, msg)
399 return RunningTask.FOLLOW_GPS_WAYPOINTS
402 self, spin_dist: float = 1.57, time_allowance: int = 10,
403 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
405 self.
debugdebug(
"Waiting for 'Spin' action server")
406 while not self.spin_client.wait_for_server(timeout_sec=1.0):
407 self.
infoinfo(
"'Spin' action server not available, waiting...")
408 goal_msg = Spin.Goal()
409 goal_msg.target_yaw = spin_dist
410 goal_msg.time_allowance = Duration(sec=time_allowance)
411 goal_msg.disable_collision_checks = disable_collision_checks
413 self.
infoinfo(f
'Spinning to angle {goal_msg.target_yaw}....')
414 send_goal_future = self.spin_client.send_goal_async(
417 rclpy.spin_until_future_complete(self, send_goal_future)
418 self.
goal_handlegoal_handle = send_goal_future.result()
421 msg =
'Spin request was rejected!'
427 return RunningTask.SPIN
430 self, backup_dist: float = 0.15, backup_speed: float = 0.025,
431 time_allowance: int = 10,
432 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
434 self.
debugdebug(
"Waiting for 'Backup' action server")
435 while not self.backup_client.wait_for_server(timeout_sec=1.0):
436 self.
infoinfo(
"'Backup' action server not available, waiting...")
437 goal_msg = BackUp.Goal()
438 goal_msg.target = Point(x=float(backup_dist))
439 goal_msg.speed = backup_speed
440 goal_msg.time_allowance = Duration(sec=time_allowance)
441 goal_msg.disable_collision_checks = disable_collision_checks
443 self.
infoinfo(f
'Backing up {goal_msg.target.x} m at {goal_msg.speed} m/s....')
444 send_goal_future = self.backup_client.send_goal_async(
447 rclpy.spin_until_future_complete(self, send_goal_future)
448 self.
goal_handlegoal_handle = send_goal_future.result()
451 msg =
'Backup request was rejected!'
457 return RunningTask.BACKUP
460 self, dist: float = 0.15, speed: float = 0.025,
461 time_allowance: int = 10,
462 disable_collision_checks: bool =
False) -> Optional[RunningTask]:
464 self.
debugdebug(
"Waiting for 'DriveOnHeading' action server")
465 while not self.backup_client.wait_for_server(timeout_sec=1.0):
466 self.
infoinfo(
"'DriveOnHeading' action server not available, waiting...")
467 goal_msg = DriveOnHeading.Goal()
468 goal_msg.target = Point(x=float(dist))
469 goal_msg.speed = speed
470 goal_msg.time_allowance = Duration(sec=time_allowance)
471 goal_msg.disable_collision_checks = disable_collision_checks
473 self.
infoinfo(f
'Drive {goal_msg.target.x} m on heading at {goal_msg.speed} m/s....')
474 send_goal_future = self.drive_on_heading_client.send_goal_async(
477 rclpy.spin_until_future_complete(self, send_goal_future)
478 self.
goal_handlegoal_handle = send_goal_future.result()
481 msg =
'Drive On Heading request was rejected!'
482 self.
setTaskErrorsetTaskError(DriveOnHeading.UNKNOWN, msg)
487 return RunningTask.DRIVE_ON_HEADING
489 def assistedTeleop(self, time_allowance: int = 30) -> Optional[RunningTask]:
492 self.
debugdebug(
"Wanting for 'assisted_teleop' action server")
494 while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0):
495 self.
infoinfo(
"'assisted_teleop' action server not available, waiting...")
496 goal_msg = AssistedTeleop.Goal()
497 goal_msg.time_allowance = Duration(sec=time_allowance)
499 self.
infoinfo(
"Running 'assisted_teleop'....")
500 send_goal_future = self.assisted_teleop_client.send_goal_async(
503 rclpy.spin_until_future_complete(self, send_goal_future)
504 self.
goal_handlegoal_handle = send_goal_future.result()
507 msg =
'Assisted Teleop request was rejected!'
508 self.
setTaskErrorsetTaskError(AssistedTeleop.UNKNOWN, msg)
513 return RunningTask.ASSISTED_TELEOP
515 def followPath(self, path: Path, controller_id: str =
'',
516 goal_checker_id: str =
'') -> Optional[RunningTask]:
518 """Send a `FollowPath` action request."""
519 self.
debugdebug(
"Waiting for 'FollowPath' action server")
520 while not self.follow_path_client.wait_for_server(timeout_sec=1.0):
521 self.
infoinfo(
"'FollowPath' action server not available, waiting...")
523 goal_msg = FollowPath.Goal()
525 goal_msg.controller_id = controller_id
526 goal_msg.goal_checker_id = goal_checker_id
528 self.
infoinfo(
'Executing path...')
529 send_goal_future = self.follow_path_client.send_goal_async(
532 rclpy.spin_until_future_complete(self, send_goal_future)
533 self.
goal_handlegoal_handle = send_goal_future.result()
536 msg =
'FollowPath goal was rejected!'
542 return RunningTask.FOLLOW_PATH
544 def dockRobotByPose(self, dock_pose: PoseStamped,
545 dock_type: str =
'', nav_to_dock: bool =
True) -> Optional[RunningTask]:
547 """Send a `DockRobot` action request."""
548 self.
infoinfo(
"Waiting for 'DockRobot' action server")
549 while not self.docking_client.wait_for_server(timeout_sec=1.0):
550 self.
infoinfo(
'"DockRobot" action server not available, waiting...')
552 goal_msg = DockRobot.Goal()
553 goal_msg.use_dock_id =
False
554 goal_msg.dock_pose = dock_pose
555 goal_msg.dock_type = dock_type
556 goal_msg.navigate_to_staging_pose = nav_to_dock
558 self.
infoinfo(
'Docking at pose: ' + str(dock_pose) +
'...')
559 send_goal_future = self.docking_client.send_goal_async(goal_msg,
561 rclpy.spin_until_future_complete(self, send_goal_future)
562 self.
goal_handlegoal_handle = send_goal_future.result()
565 msg =
'DockRobot request was rejected!'
571 return RunningTask.DOCK_ROBOT
573 def dockRobotByID(self, dock_id: str, nav_to_dock: bool =
True) -> Optional[RunningTask]:
574 """Send a `DockRobot` action request."""
576 self.
infoinfo(
"Waiting for 'DockRobot' action server")
577 while not self.docking_client.wait_for_server(timeout_sec=1.0):
578 self.
infoinfo(
'"DockRobot" action server not available, waiting...')
580 goal_msg = DockRobot.Goal()
581 goal_msg.use_dock_id =
True
582 goal_msg.dock_id = dock_id
583 goal_msg.navigate_to_staging_pose = nav_to_dock
585 self.
infoinfo(
'Docking at dock ID: ' + str(dock_id) +
'...')
586 send_goal_future = self.docking_client.send_goal_async(goal_msg,
588 rclpy.spin_until_future_complete(self, send_goal_future)
589 self.
goal_handlegoal_handle = send_goal_future.result()
592 msg =
'DockRobot request was rejected!'
598 return RunningTask.DOCK_ROBOT
600 def undockRobot(self, dock_type: str =
'') -> Optional[RunningTask]:
601 """Send a `UndockRobot` action request."""
603 self.
infoinfo(
"Waiting for 'UndockRobot' action server")
604 while not self.undocking_client.wait_for_server(timeout_sec=1.0):
605 self.
infoinfo(
'"UndockRobot" action server not available, waiting...')
607 goal_msg = UndockRobot.Goal()
608 goal_msg.dock_type = dock_type
610 self.
infoinfo(
'Undocking from dock of type: ' + str(dock_type) +
'...')
611 send_goal_future = self.undocking_client.send_goal_async(goal_msg,
613 rclpy.spin_until_future_complete(self, send_goal_future)
614 self.
goal_handlegoal_handle = send_goal_future.result()
617 msg =
'UndockRobot request was rejected!'
623 return RunningTask.UNDOCK_ROBOT
626 """Cancel pending task request of any type."""
627 self.
infoinfo(
'Canceling current task.')
630 future = self.
goal_handlegoal_handle.cancel_goal_async()
631 rclpy.spin_until_future_complete(self, future)
633 self.
errorerror(
'Cancel task failed, goal handle is None')
634 self.
setTaskErrorsetTaskError(0,
'Cancel task failed, goal handle is None')
639 rclpy.spin_until_future_complete(self, future)
641 self.
errorerror(
'Cancel route task failed, goal handle is None')
642 self.
setTaskErrorsetTaskError(0,
'Cancel route task failed, goal handle is None')
648 """Check if the task request of any type is complete yet."""
651 self.
errorerror(
'Task is None, cannot check for completion')
655 if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
659 if not result_future:
664 rclpy.spin_until_future_complete(self, result_future, timeout_sec=0.10)
665 result_response = result_future.result()
668 self.
statusstatus = result_response.status
669 if self.
statusstatus != GoalStatus.STATUS_SUCCEEDED:
670 result = result_response.result
671 if result
is not None:
672 self.
setTaskErrorsetTaskError(result.error_code, result.error_msg)
674 'Task with failed with'
675 f
' status code:{self.status}'
676 f
' error code:{result.error_code}'
677 f
' error msg:{result.error_msg}')
681 self.
debugdebug(
'Task failed with no result received')
687 self.
debugdebug(
'Task succeeded!')
690 def getFeedback(self, task: RunningTask = RunningTask.NONE) -> Any:
691 """Get the pending action feedback message."""
692 if task != RunningTask.COMPUTE_AND_TRACK_ROUTE:
694 if len(self.route_feedback) > 0:
695 return self.route_feedback.pop(0)
699 """Get the pending action result message."""
700 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
701 return TaskResult.SUCCEEDED
702 elif self.
statusstatus == GoalStatus.STATUS_ABORTED:
703 return TaskResult.FAILED
704 elif self.
statusstatus == GoalStatus.STATUS_CANCELED:
705 return TaskResult.CANCELED
707 return TaskResult.UNKNOWN
709 def clearTaskError(self) -> None:
713 def setTaskError(self, error_code: int, error_msg: str) ->
None:
717 def getTaskError(self) -> tuple[int, str]:
721 localizer: str =
'amcl') ->
None:
722 """Block until the full navigation system is up and running."""
723 if localizer !=
'robot_localization':
725 if localizer ==
'amcl':
728 self.
infoinfo(
'Nav2 is ready for use!')
732 self, start: PoseStamped, goal: PoseStamped,
733 planner_id: str =
'', use_start: bool =
False
734 ) -> ComputePathToPose.Result:
736 Send a `ComputePathToPose` action request.
738 Internal implementation to get the full result, not just the path.
740 self.
debugdebug(
"Waiting for 'ComputePathToPose' action server")
741 while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0):
742 self.
infoinfo(
"'ComputePathToPose' action server not available, waiting...")
744 goal_msg = ComputePathToPose.Goal()
745 goal_msg.start = start
747 goal_msg.planner_id = planner_id
748 goal_msg.use_start = use_start
750 self.
infoinfo(
'Getting path...')
751 send_goal_future = self.compute_path_to_pose_client.send_goal_async(goal_msg)
752 rclpy.spin_until_future_complete(self, send_goal_future)
753 self.
goal_handlegoal_handle = send_goal_future.result()
756 self.
errorerror(
'Get path was rejected!')
757 self.
statusstatus = GoalStatus.UNKNOWN
758 result = ComputePathToPose.Result()
759 result.error_code = ComputePathToPose.UNKNOWN
760 result.error_msg =
'Get path was rejected'
764 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
770 self, start: PoseStamped, goal: PoseStamped,
771 planner_id: str =
'', use_start: bool =
False) -> Path:
772 """Send a `ComputePathToPose` action request."""
774 rtn = self.
_getPathImpl_getPathImpl(start, goal, planner_id, use_start)
776 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
779 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
780 self.
warnwarn(
'Getting path failed with'
781 f
' status code:{self.status}'
782 f
' error code:{rtn.error_code}'
783 f
' error msg:{rtn.error_msg}')
786 def _getPathThroughPosesImpl(
787 self, start: PoseStamped, goals: Goals,
788 planner_id: str =
'', use_start: bool =
False
789 ) -> ComputePathThroughPoses.Result:
791 Send a `ComputePathThroughPoses` action request.
793 Internal implementation to get the full result, not just the path.
795 self.
debugdebug(
"Waiting for 'ComputePathThroughPoses' action server")
796 while not self.compute_path_through_poses_client.wait_for_server(
800 "'ComputePathThroughPoses' action server not available, waiting..."
803 goal_msg = ComputePathThroughPoses.Goal()
804 goal_msg.start = start
805 goal_msg.goals.header.frame_id =
'map'
806 goal_msg.goals.header.stamp = self.get_clock().now().to_msg()
807 goal_msg.goals.goals = goals
808 goal_msg.planner_id = planner_id
809 goal_msg.use_start = use_start
811 self.
infoinfo(
'Getting path...')
812 send_goal_future = self.compute_path_through_poses_client.send_goal_async(
815 rclpy.spin_until_future_complete(self, send_goal_future)
816 self.
goal_handlegoal_handle = send_goal_future.result()
819 self.
errorerror(
'Get path was rejected!')
820 result = ComputePathThroughPoses.Result()
821 result.error_code = ComputePathThroughPoses.UNKNOWN
822 result.error_msg =
'Get path was rejected!'
826 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
832 self, start: PoseStamped, goals: Goals,
833 planner_id: str =
'', use_start: bool =
False) -> Path:
834 """Send a `ComputePathThroughPoses` action request."""
838 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
841 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
842 self.
warnwarn(
'Getting path failed with'
843 f
' status code:{self.status}'
844 f
' error code:{rtn.error_code}'
845 f
' error msg:{rtn.error_msg}')
849 self, start: Union[int, PoseStamped],
850 goal: Union[int, PoseStamped], use_start: bool =
False
851 ) -> ComputeRoute.Result:
853 Send a `ComputeRoute` action request.
855 Internal implementation to get the full result, not just the sparse route and dense path.
857 self.
debugdebug(
"Waiting for 'ComputeRoute' action server")
858 while not self.compute_route_client.wait_for_server(timeout_sec=1.0):
859 self.
infoinfo(
"'ComputeRoute' action server not available, waiting...")
861 goal_msg = ComputeRoute.Goal()
862 goal_msg.use_start = use_start
865 if isinstance(start, int)
and isinstance(goal, int):
866 goal_msg.start_id = start
867 goal_msg.goal_id = goal
868 goal_msg.use_poses =
False
869 elif isinstance(start, PoseStamped)
and isinstance(goal, PoseStamped):
870 goal_msg.start = start
872 goal_msg.use_poses =
True
874 self.
errorerror(
'Invalid start and goal types. Must be PoseStamped for pose or int for ID')
875 result = ComputeRoute.Result()
876 result.error_code = ComputeRoute.UNKNOWN
877 result.error_msg =
'Request type fields were invalid!'
880 self.
infoinfo(
'Getting route...')
881 send_goal_future = self.compute_route_client.send_goal_async(goal_msg)
882 rclpy.spin_until_future_complete(self, send_goal_future)
883 self.
goal_handlegoal_handle = send_goal_future.result()
886 self.
errorerror(
'Get route was rejected!')
887 result = ComputeRoute.Result()
888 result.error_code = ComputeRoute.UNKNOWN
889 result.error_msg =
'Get route was rejected!'
893 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
899 self, start: Union[int, PoseStamped],
900 goal: Union[int, PoseStamped],
901 use_start: bool =
False) -> Optional[list[Union[Path, Route]]]:
902 """Send a `ComputeRoute` action request."""
904 rtn = self.
_getRouteImpl_getRouteImpl(start, goal, use_start=
False)
906 if self.
statusstatus != GoalStatus.STATUS_SUCCEEDED:
907 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
909 'Getting route failed with'
910 f
' status code:{self.status}'
911 f
' error code:{rtn.error_code}'
912 f
' error msg:{rtn.error_msg}')
915 return [rtn.path, rtn.route]
918 self, start: Union[int, PoseStamped],
919 goal: Union[int, PoseStamped], use_start: bool =
False
920 ) -> Optional[RunningTask]:
921 """Send a `ComputeAndTrackRoute` action request."""
923 self.
debugdebug(
"Waiting for 'ComputeAndTrackRoute' action server")
924 while not self.compute_and_track_route_client.wait_for_server(timeout_sec=1.0):
925 self.
infoinfo(
"'ComputeAndTrackRoute' action server not available, waiting...")
927 goal_msg = ComputeAndTrackRoute.Goal()
928 goal_msg.use_start = use_start
931 if isinstance(start, int)
and isinstance(goal, int):
932 goal_msg.start_id = start
933 goal_msg.goal_id = goal
934 goal_msg.use_poses =
False
935 elif isinstance(start, PoseStamped)
and isinstance(goal, PoseStamped):
936 goal_msg.start = start
938 goal_msg.use_poses =
True
940 self.
setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN,
'Request type fields were invalid!')
941 self.
errorerror(
'Invalid start and goal types. Must be PoseStamped for pose or int for ID')
944 self.
infoinfo(
'Computing and tracking route...')
945 send_goal_future = self.compute_and_track_route_client.send_goal_async(goal_msg,
947 rclpy.spin_until_future_complete(self, send_goal_future)
951 msg =
'Compute and track route was rejected!'
952 self.
setTaskErrorsetTaskError(ComputeAndTrackRoute.UNKNOWN, msg)
957 return RunningTask.COMPUTE_AND_TRACK_ROUTE
960 self, path: Path, smoother_id: str =
'',
961 max_duration: float = 2.0, check_for_collision: bool =
False
962 ) -> SmoothPath.Result:
964 Send a `SmoothPath` action request.
966 Internal implementation to get the full result, not just the path.
968 self.
debugdebug(
"Waiting for 'SmoothPath' action server")
969 while not self.smoother_client.wait_for_server(timeout_sec=1.0):
970 self.
infoinfo(
"'SmoothPath' action server not available, waiting...")
972 goal_msg = SmoothPath.Goal()
974 goal_msg.max_smoothing_duration = rclpyDuration(seconds=max_duration).to_msg()
975 goal_msg.smoother_id = smoother_id
976 goal_msg.check_for_collisions = check_for_collision
978 self.
infoinfo(
'Smoothing path...')
979 send_goal_future = self.smoother_client.send_goal_async(goal_msg)
980 rclpy.spin_until_future_complete(self, send_goal_future)
981 self.
goal_handlegoal_handle = send_goal_future.result()
984 self.
errorerror(
'Smooth path was rejected!')
985 result = SmoothPath.Result()
986 result.error_code = SmoothPath.UNKNOWN
987 result.error_msg =
'Smooth path was rejected'
991 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
997 self, path: Path, smoother_id: str =
'',
998 max_duration: float = 2.0, check_for_collision: bool =
False) -> Path:
999 """Send a `SmoothPath` action request."""
1001 rtn = self.
_smoothPathImpl_smoothPathImpl(path, smoother_id, max_duration, check_for_collision)
1003 if self.
statusstatus == GoalStatus.STATUS_SUCCEEDED:
1006 self.
setTaskErrorsetTaskError(rtn.error_code, rtn.error_msg)
1007 self.
warnwarn(
'Getting path failed with'
1008 f
' status code:{self.status}'
1009 f
' error code:{rtn.error_code}'
1010 f
' error msg:{rtn.error_msg}')
1014 """Change the current static map in the map server."""
1015 while not self.change_maps_srv.wait_for_service(timeout_sec=1.0):
1016 self.
infoinfo(
'change map service not available, waiting...')
1017 req = LoadMap.Request()
1018 req.map_url = map_filepath
1019 future = self.change_maps_srv.call_async(req)
1020 rclpy.spin_until_future_complete(self, future)
1022 future_result = future.result()
1023 if future_result
is None:
1024 self.
errorerror(
'Change map request failed!')
1027 result = future_result.result
1028 if result != LoadMap.Response().RESULT_SUCCESS:
1029 if result == LoadMap.RESULT_MAP_DOES_NOT_EXIST:
1030 reason =
'Map does not exist'
1031 elif result == LoadMap.INVALID_MAP_DATA:
1032 reason =
'Invalid map data'
1033 elif result == LoadMap.INVALID_MAP_METADATA:
1034 reason =
'Invalid map metadata'
1035 elif result == LoadMap.UNDEFINED_FAILURE:
1036 reason =
'Undefined failure'
1040 self.
errorerror(f
'Change map request failed:{reason}!')
1043 self.
infoinfo(
'Change map request was successful!')
1047 """Clear all costmaps."""
1053 """Clear local costmap."""
1054 while not self.clear_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1055 self.
infoinfo(
'Clear local costmaps service not available, waiting...')
1056 req = ClearEntireCostmap.Request()
1057 future = self.clear_costmap_local_srv.call_async(req)
1058 rclpy.spin_until_future_complete(self, future)
1062 """Clear global costmap."""
1063 while not self.clear_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1064 self.
infoinfo(
'Clear global costmaps service not available, waiting...')
1065 req = ClearEntireCostmap.Request()
1066 future = self.clear_costmap_global_srv.call_async(req)
1067 rclpy.spin_until_future_complete(self, future)
1071 """Clear the costmap except for a specified region."""
1072 while not self.clear_costmap_except_region_srv.wait_for_service(timeout_sec=1.0):
1073 self.
infoinfo(
'ClearCostmapExceptRegion service not available, waiting...')
1074 req = ClearCostmapExceptRegion.Request()
1075 req.reset_distance = reset_distance
1076 future = self.clear_costmap_except_region_srv.call_async(req)
1077 rclpy.spin_until_future_complete(self, future)
1081 """Clear the costmap around the robot."""
1082 while not self.clear_costmap_around_robot_srv.wait_for_service(timeout_sec=1.0):
1083 self.
infoinfo(
'ClearCostmapAroundRobot service not available, waiting...')
1084 req = ClearCostmapAroundRobot.Request()
1085 req.reset_distance = reset_distance
1086 future = self.clear_costmap_around_robot_srv.call_async(req)
1087 rclpy.spin_until_future_complete(self, future)
1091 """Clear the costmap around a given pose."""
1092 while not self.clear_local_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1093 self.
infoinfo(
'ClearLocalCostmapAroundPose service not available, waiting...')
1094 req = ClearCostmapAroundPose.Request()
1096 req.reset_distance = reset_distance
1097 future = self.clear_local_costmap_around_pose_srv.call_async(req)
1098 rclpy.spin_until_future_complete(self, future)
1102 """Clear the global costmap around a given pose."""
1103 while not self.clear_global_costmap_around_pose_srv.wait_for_service(timeout_sec=1.0):
1104 self.
infoinfo(
'ClearGlobalCostmapAroundPose service not available, waiting...')
1105 req = ClearCostmapAroundPose.Request()
1107 req.reset_distance = reset_distance
1108 future = self.clear_global_costmap_around_pose_srv.call_async(req)
1109 rclpy.spin_until_future_complete(self, future)
1113 """Get the global costmap."""
1114 while not self.get_costmap_global_srv.wait_for_service(timeout_sec=1.0):
1115 self.
infoinfo(
'Get global costmaps service not available, waiting...')
1116 req = GetCostmap.Request()
1117 future = self.get_costmap_global_srv.call_async(req)
1118 rclpy.spin_until_future_complete(self, future)
1120 result = future.result()
1122 self.
errorerror(
'Get global costmap request failed!')
1128 """Get the local costmap."""
1129 while not self.get_costmap_local_srv.wait_for_service(timeout_sec=1.0):
1130 self.
infoinfo(
'Get local costmaps service not available, waiting...')
1131 req = GetCostmap.Request()
1132 future = self.get_costmap_local_srv.call_async(req)
1133 rclpy.spin_until_future_complete(self, future)
1135 result = future.result()
1138 self.
errorerror(
'Get local costmap request failed!')
1144 """Startup nav2 lifecycle system."""
1145 self.
infoinfo(
'Starting up lifecycle nodes based on lifecycle_manager.')
1146 for srv_name, srv_type
in self.get_service_names_and_types():
1147 if srv_type[0] ==
'nav2_msgs/srv/ManageLifecycleNodes':
1148 self.
infoinfo(f
'Starting up {srv_name}')
1149 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1150 self.create_client(ManageLifecycleNodes, srv_name)
1151 while not mgr_client.wait_for_service(timeout_sec=1.0):
1152 self.
infoinfo(f
'{srv_name} service not available, waiting...')
1153 req = ManageLifecycleNodes.Request()
1154 req.command = ManageLifecycleNodes.Request().STARTUP
1155 future = mgr_client.call_async(req)
1160 rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
1165 self.
infoinfo(
'Nav2 is ready for use!')
1169 """Shutdown nav2 lifecycle system."""
1170 self.
infoinfo(
'Shutting down lifecycle nodes based on lifecycle_manager.')
1171 for srv_name, srv_type
in self.get_service_names_and_types():
1172 if srv_type[0] ==
'nav2_msgs/srv/ManageLifecycleNodes':
1173 self.
infoinfo(f
'Shutting down {srv_name}')
1174 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
1175 self.create_client(ManageLifecycleNodes, srv_name)
1176 while not mgr_client.wait_for_service(timeout_sec=1.0):
1177 self.
infoinfo(f
'{srv_name} service not available, waiting...')
1178 req = ManageLifecycleNodes.Request()
1179 req.command = ManageLifecycleNodes.Request().SHUTDOWN
1180 future = mgr_client.call_async(req)
1181 rclpy.spin_until_future_complete(self, future)
1185 def _waitForNodeToActivate(self, node_name: str) ->
None:
1187 self.
debugdebug(f
'Waiting for {node_name} to become active..')
1188 node_service = f
'{node_name}/get_state'
1189 state_client: Client[GetState.Request, GetState.Response] = \
1190 self.create_client(GetState, node_service)
1191 while not state_client.wait_for_service(timeout_sec=1.0):
1192 self.
infoinfo(f
'{node_service} service not available, waiting...')
1194 req = GetState.Request()
1196 while state !=
'active':
1197 self.
debugdebug(f
'Getting {node_name} state...')
1198 future = state_client.call_async(req)
1199 rclpy.spin_until_future_complete(self, future)
1201 result = future.result()
1202 if result
is not None:
1203 state = result.current_state.label
1204 self.
debugdebug(f
'Result of get_state: {state}')
1208 def _waitForInitialPose(self) -> None:
1210 self.
infoinfo(
'Setting initial pose')
1212 self.
infoinfo(
'Waiting for amcl_pose to be received')
1213 rclpy.spin_once(self, timeout_sec=1.0)
1216 def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
1217 self.
debugdebug(
'Received amcl pose')
1221 def _feedbackCallback(self, msg: NavigateToPose.Feedback) ->
None:
1222 self.
debugdebug(
'Received action feedback message')
1223 self.
feedbackfeedback = msg.feedback
1226 def _routeFeedbackCallback(self, msg: ComputeAndTrackRoute.Feedback) ->
None:
1227 self.
debugdebug(
'Received route action feedback message')
1228 self.route_feedback.append(msg.feedback)
1231 def _setInitialPose(self) -> None:
1232 msg = PoseWithCovarianceStamped()
1234 msg.header.frame_id = self.
initial_poseinitial_pose.header.frame_id
1235 msg.header.stamp = self.
initial_poseinitial_pose.header.stamp
1236 self.
infoinfo(
'Publishing Initial Pose')
1240 def info(self, msg: str) ->
None:
1241 self.get_logger().info(msg)
1244 def warn(self, msg: str) ->
None:
1245 self.get_logger().warning(msg)
1248 def error(self, msg: str) ->
None:
1249 self.get_logger().error(msg)
1252 def debug(self, msg: str) ->
None:
1253 self.get_logger().debug(msg)
OccupancyGrid getGlobalCostmap(self)
Optional[RunningTask] undockRobot(self, str dock_type='')
ComputePathToPose.Result _getPathImpl(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
None lifecycleShutdown(self)
Path smoothPath(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
TaskResult getResult(self)
None clearGlobalCostmap(self)
None clearCostmapExceptRegion(self, float reset_distance)
Optional[RunningTask] getAndTrackRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None clearLocalCostmap(self)
Optional[RunningTask] followGpsWaypoints(self, list[GeoPose] gps_poses)
None clearAllCostmaps(self)
Path getPath(self, PoseStamped start, PoseStamped goal, str planner_id='', bool use_start=False)
ComputePathThroughPoses.Result _getPathThroughPosesImpl(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
Optional[RunningTask] dockRobotByID(self, str dock_id, bool nav_to_dock=True)
Optional[RunningTask] goToPose(self, PoseStamped pose, str behavior_tree='')
None _feedbackCallback(self, NavigateToPose.Feedback msg)
None clearCostmapAroundRobot(self, float reset_distance)
bool changeMap(self, str map_filepath)
Optional[RunningTask] goThroughPoses(self, Goals poses, str behavior_tree='')
None _amclPoseCallback(self, PoseWithCovarianceStamped msg)
Any getFeedback(self, RunningTask task=RunningTask.NONE)
bool isTaskComplete(self, RunningTask task=RunningTask.NONE)
None setInitialPose(self, PoseStamped initial_pose)
Path getPathThroughPoses(self, PoseStamped start, Goals goals, str planner_id='', bool use_start=False)
None _waitForNodeToActivate(self, str node_name)
SmoothPath.Result _smoothPathImpl(self, Path path, str smoother_id='', float max_duration=2.0, bool check_for_collision=False)
Optional[RunningTask] followWaypoints(self, list[PoseStamped] poses)
None clearGlobalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
None lifecycleStartup(self)
None clearLocalCostmapAroundPose(self, PoseStamped pose, float reset_distance)
None clearTaskError(self)
None _setInitialPose(self)
Optional[list[Union[Path, Route]]] getRoute(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None waitUntilNav2Active(self, str navigator='bt_navigator', str localizer='amcl')
None error(self, str msg)
None _routeFeedbackCallback(self, ComputeAndTrackRoute.Feedback msg)
OccupancyGrid getLocalCostmap(self)
ComputeRoute.Result _getRouteImpl(self, Union[int, PoseStamped] start, Union[int, PoseStamped] goal, bool use_start=False)
None setTaskError(self, int error_code, str error_msg)
None _waitForInitialPose(self)
None debug(self, str msg)