15 from collections
import defaultdict
17 from typing
import Any, Dict, List, Tuple, TypedDict
19 from nav2_smac_planner.lattice_primitives.helper
import angle_difference, interpolate_yaws
20 from nav2_smac_planner.lattice_primitives.trajectory
import (AnyFloat, FloatNDArray, Path,
21 Trajectory, TrajectoryParameters)
22 from nav2_smac_planner.lattice_primitives.trajectory_generator
import TrajectoryGenerator
24 from rtree
import index
28 grid_resolution: float
30 stopping_threshold: int
37 Handles all the logic for computing the minimal control set.
39 Computes the minimal control set for a vehicle given its parameters.
40 Includes handling the propagating and searching along wavefronts as
41 well as determining if a trajectory is part of the minimal set based
42 on previously added trajectories.
46 """An Enum used for determining the motion model to use."""
53 """An Enum used for determining how a trajectory should be flipped."""
60 """Init the lattice generator from the user supplied config."""
73 def _get_wave_front_points(self, pos: int) -> FloatNDArray:
75 Calculate the end points that lie on the wave front.
77 Uses the user supplied grid resolution to calculate the
78 valid end points that lie on a wave front at a discrete
79 interval away from the origin.
83 The number of discrete intervals of grid resolution
84 away from the origin to generate the wave points at
89 An array of coordinates
100 positions.append((max_point_coord, varying_point_coord))
103 positions.append((varying_point_coord, max_point_coord))
106 positions.append((max_point_coord, max_point_coord))
108 return np.array(positions)
110 def _get_heading_discretization(self, number_of_headings: int) -> List[int]:
112 Calculate the heading discretization based on the number of headings.
114 Does not uniformly generate headings but instead generates a set of
115 discrete headings that is better suited for straight line trajectories.
118 number_of_headings: int
119 The number of headings to discretize a 360 degree turn into
124 A list of headings in radians
127 max_val = int(number_of_headings / 8)
134 for i
in range(-max_val, max_val + 1):
135 outer_edge_x.extend([i, i])
136 outer_edge_y.extend([-max_val, max_val])
138 if i != max_val
and i != -max_val:
139 outer_edge_x.extend([-max_val, max_val])
140 outer_edge_y.extend([i, i])
142 return sorted([np.arctan2(j, i)
for i, j
in zip(outer_edge_x, outer_edge_y)])
144 def _point_to_line_distance(self, p1: FloatNDArray, p2: FloatNDArray,
145 q: FloatNDArray) -> AnyFloat:
147 Return the shortest distance from a point to a line segment.
151 Start point of line segment
153 End point of line segment
155 Point to get distance away from line of
160 The shortest distance between q and line segment p1p2
164 l2 = np.inner(p1 - p2, p1 - p2)
167 return np.linalg.norm(p1 - q)
170 t = max(0, min(1, np.dot(q - p1, p2 - p1) / l2))
171 projected_point = p1 + t * (p2 - p1)
173 return np.linalg.norm(q - projected_point)
175 def _is_minimal_trajectory(
176 self, trajectory: Trajectory, prior_end_poses: index.Rtree
179 Determine whether a trajectory is a minimal trajectory.
181 Uses an RTree for speedup.
184 trajectory: Trajectory
185 The trajectory to check
186 prior_end_poses: RTree
187 An RTree holding the current minimal set of trajectories
192 True if the trajectory is a minimal trajectory otherwise false
196 for x1, y1, x2, y2, yaw
in zip(
197 trajectory.path.xs[:-1],
198 trajectory.path.ys[:-1],
199 trajectory.path.xs[1:],
200 trajectory.path.ys[1:],
201 trajectory.path.yaws[:-1],
204 p1 = np.array([x1, y1])
205 p2 = np.array([x2, y2])
218 for prior_end_pose
in prior_end_poses.intersection(
219 (left_bb, bottom_bb, right_bb, top_bb), objects=
'raw'
224 and angle_difference(yaw, prior_end_pose[-1])
231 def _compute_min_trajectory_length(self) -> float:
233 Compute the minimum trajectory length for the given parameters.
235 The minimum trajectory length is defined as the length needed
236 for the sharpest possible turn to move from 0 degrees to the next
237 discrete heading. Since the distance between headings is not uniform
238 we take the smallest possible difference.
243 The minimal length of a trajectory
250 for i
in range(len(self.
headingsheadings) - 1)
255 def _generate_minimal_spanning_set(self) -> Dict[float, List[Trajectory]]:
257 Generate the minimal spanning set.
259 Iteratves over all possible trajectories and keeps only those that
260 are part of the minimal set.
265 A dictionary where the key is the start_angle and the value is
266 a list of trajectories that begin at that angle
269 quadrant1_end_poses: Dict[int, List[Tuple[Any, int]]] = defaultdict(list)
273 initial_headings = sorted(
274 filter(
lambda x: 0 <= x
and x <= np.pi / 2, self.
headingsheadings)
279 wave_front_start_pos = int(
283 for start_heading
in initial_headings:
284 iterations_without_trajectory = 0
286 prior_end_poses = index.Index()
288 wave_front_cur_pos = wave_front_start_pos
292 target_headings = sorted(
293 self.
headingsheadings, key=
lambda x: (abs(x - start_heading), -x)
295 target_headings = list(
296 filter(
lambda x: abs(start_heading - x) <= np.pi / 2, target_headings)
300 iterations_without_trajectory += 1
305 for target_point
in positions:
306 for target_heading
in target_headings:
317 if trajectory
is not None:
323 new_end_pose = np.array(
324 [target_point[0], target_point[1], target_heading]
327 quadrant1_end_poses[start_heading].append(
328 (target_point, target_heading)
338 prior_end_poses.insert(
340 (left_bb, bottom_bb, right_bb, top_bb),
344 iterations_without_trajectory = 0
346 wave_front_cur_pos += 1
352 def _flip_angle(self, angle: int, flip_type: Flip) -> float:
354 Return the the appropriate flip of the angle in self.headings.
360 Whether to flip acrpss X axis, Y axis, or both
365 The angle in self.heading that is the appropriate flip
368 angle_idx = self.
headingsheadings.index(angle)
370 if flip_type == self.
FlipFlip.X:
371 heading_idx = (self.
num_of_headingsnum_of_headings / 2 - 1) - angle_idx - 1
372 elif flip_type == self.
FlipFlip.Y:
374 elif flip_type == self.
FlipFlip.BOTH:
379 raise Exception(f
'Unsupported flip type: {flip_type}')
381 return self.
headingsheadings[int(heading_idx)]
383 def _create_complete_minimal_spanning_set(
384 self, single_quadrant_minimal_set: Dict[int, List[Tuple[Any, int]]]
385 ) -> Dict[float, List[Trajectory]]:
387 Create the full minimal spanning set from a single quadrant set.
389 Exploits the symmetry between the quadrants to create the full set.
390 This is done by flipping every trajectory in the first quadrant across
391 either the X-axis, Y-axis, or both axes.
394 single_quadrant_minimal_set: dict
395 The minimal set for quadrant 1 (positive x and positive y)
400 The complete minimal spanning set containing the trajectories
404 all_trajectories: Dict[float, List[Trajectory]] = defaultdict(list)
406 for start_angle
in single_quadrant_minimal_set.keys():
408 for end_point, end_angle
in single_quadrant_minimal_set[start_angle]:
414 if start_angle == 0
and end_angle == 0:
415 unflipped_start_angle = 0.0
416 flipped_x_start_angle = np.pi
418 unflipped_end_angle = 0.0
419 flipped_x_end_angle = np.pi
422 unflipped_trajectory = (
425 unflipped_start_angle,
430 flipped_x_trajectory = (
433 flipped_x_start_angle,
439 if unflipped_trajectory
is None or flipped_x_trajectory
is None:
440 raise ValueError(
'No trajectory was found')
443 unflipped_trajectory.parameters.start_angle
444 ].append(unflipped_trajectory)
447 flipped_x_trajectory.parameters.start_angle
448 ].append(flipped_x_trajectory)
450 elif abs(start_angle) == np.pi / 2
and abs(end_angle) == np.pi / 2:
451 unflipped_start_angle = np.pi / 2
452 flipped_y_start_angle = -np.pi / 2
454 unflipped_end_angle = np.pi / 2
455 flipped_y_end_angle = -np.pi / 2
458 unflipped_trajectory = (
461 unflipped_start_angle,
467 flipped_y_trajectory = (
470 flipped_y_start_angle,
476 if unflipped_trajectory
is None or flipped_y_trajectory
is None:
477 raise ValueError(
'No trajectory was found')
480 unflipped_trajectory.parameters.start_angle
481 ].append(unflipped_trajectory)
483 flipped_y_trajectory.parameters.start_angle
484 ].append(flipped_y_trajectory)
487 unflipped_start_angle = start_angle
488 flipped_x_start_angle = self.
_flip_angle_flip_angle(start_angle, self.
FlipFlip.X)
489 flipped_y_start_angle = self.
_flip_angle_flip_angle(start_angle, self.
FlipFlip.Y)
490 flipped_xy_start_angle = self.
_flip_angle_flip_angle(
491 start_angle, self.
FlipFlip.BOTH
494 unflipped_end_angle = end_angle
495 flipped_x_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.X)
496 flipped_y_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.Y)
497 flipped_xy_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.BOTH)
500 unflipped_trajectory = (
503 unflipped_start_angle,
508 flipped_x_trajectory = (
511 flipped_x_start_angle,
516 flipped_y_trajectory = (
519 flipped_y_start_angle,
524 flipped_xy_trajectory = (
527 flipped_xy_start_angle,
528 flipped_xy_end_angle,
533 if (unflipped_trajectory
is None or flipped_y_trajectory
is None or
534 flipped_x_trajectory
is None or flipped_xy_trajectory
is None):
535 raise ValueError(
'No trajectory was found')
538 unflipped_trajectory.parameters.start_angle
539 ].append(unflipped_trajectory)
541 flipped_x_trajectory.parameters.start_angle
542 ].append(flipped_x_trajectory)
544 flipped_y_trajectory.parameters.start_angle
545 ].append(flipped_y_trajectory)
547 flipped_xy_trajectory.parameters.start_angle
548 ].append(flipped_xy_trajectory)
550 return all_trajectories
552 def _handle_motion_model(self, spanning_set: Dict[float, List[Trajectory]]
553 ) -> Dict[float, List[Trajectory]]:
555 Add the appropriate motions for the user supplied motion model.
557 Ackerman: No additional trajectories
559 Diff: In place turns to the right and left
561 Omni: Diff + Sliding movements to right and left
565 The minimal spanning set
570 The minimal spanning set with additional trajectories based
579 return diff_spanning_set
584 return omni_spanning_set
587 print(
'No handling implemented for Motion Model: ' + f
'{self.motion_model}')
588 raise NotImplementedError
590 def _add_in_place_turns(self, spanning_set: Dict[float, List[Trajectory]]
591 ) -> Dict[float, List[Trajectory]]:
593 Add in place turns to the spanning set.
595 In place turns are trajectories with only a rotational component and
596 only shift a single angular heading step
600 The minimal spanning set
605 The minimal spanning set containing additional in place turns
609 all_angles = sorted(spanning_set.keys())
611 for idx, start_angle
in enumerate(all_angles):
612 prev_angle_idx = idx - 1
if idx - 1 >= 0
else len(all_angles) - 1
613 next_angle_idx = idx + 1
if idx + 1 < len(all_angles)
else 0
615 prev_angle = all_angles[prev_angle_idx]
616 next_angle = all_angles[next_angle_idx]
618 left_turn_params = TrajectoryParameters.no_arc(
619 end_point=np.array([0, 0]),
620 start_angle=start_angle,
621 end_angle=next_angle,
623 right_turn_params = TrajectoryParameters.no_arc(
624 end_point=np.array([0, 0]),
625 start_angle=start_angle,
626 end_angle=prev_angle,
631 angle_dif = angle_difference(start_angle, next_angle)
632 steps = int(round(angle_dif / np.deg2rad(10))) + 1
634 position = np.full(steps, 0)
635 left_yaws = interpolate_yaws(start_angle, next_angle,
True, steps)
636 right_yaws = interpolate_yaws(start_angle, prev_angle,
False, steps)
638 left_turn_path = Path(xs=position, ys=position, yaws=left_yaws)
639 right_turn_path = Path(xs=position, ys=position, yaws=right_yaws)
641 left_turn = Trajectory(parameters=left_turn_params, path=left_turn_path)
642 right_turn = Trajectory(parameters=right_turn_params, path=right_turn_path)
644 spanning_set[start_angle].append(left_turn)
645 spanning_set[start_angle].append(right_turn)
649 def _add_horizontal_motions(self, spanning_set: Dict[float, List[Trajectory]]
650 ) -> Dict[float, List[Trajectory]]:
652 Add horizontal sliding motions to the spanning set.
654 The horizontal sliding motions are simply straight line trajectories
655 at 90 degrees to every start angle in the spanning set. The yaw of these
656 trajectories is the same as the start angle for which it is generated.
660 The minimal spanning set
665 The minimal spanning set containing additional sliding motions
673 for idx, angle
in enumerate(self.
headingsheadings):
677 left_angle_idx = int((idx + idx_offset) % self.
num_of_headingsnum_of_headings)
678 left_angle = self.
headingsheadings[left_angle_idx]
679 left_trajectories = spanning_set[left_angle]
680 left_straight_trajectory = next(
681 t
for t
in left_trajectories
if t.parameters.end_angle == left_angle
686 right_angle_idx = int((idx - idx_offset) % self.
num_of_headingsnum_of_headings)
687 right_angle = self.
headingsheadings[right_angle_idx]
688 right_trajectories = spanning_set[right_angle]
689 right_straight_trajectory = next(
690 t
for t
in right_trajectories
if t.parameters.end_angle == right_angle
694 len(left_straight_trajectory.path.xs), angle, dtype=np.float64
699 parmas_l = left_straight_trajectory.parameters
700 left_motion_parameters = TrajectoryParameters(
701 parmas_l.turning_radius,
708 parmas_l.arc_start_point,
709 parmas_l.arc_end_point,
714 params_r = right_straight_trajectory.parameters
715 right_motion_parameters = TrajectoryParameters(
716 params_r.turning_radius,
723 parmas_l.arc_start_point,
724 parmas_l.arc_end_point,
727 left_motion = Trajectory(
728 parameters=left_motion_parameters,
730 xs=left_straight_trajectory.path.xs,
731 ys=left_straight_trajectory.path.ys,
736 right_motion = Trajectory(
737 parameters=right_motion_parameters,
739 xs=right_straight_trajectory.path.xs,
740 ys=right_straight_trajectory.path.ys,
745 spanning_set[angle].append(left_motion)
746 spanning_set[angle].append(right_motion)
750 def run(self) -> Dict[float, List[Trajectory]]:
752 Run the lattice generator.
757 The minimal spanning set including additional motions for the
758 specified motion model
Dict[float, List[Trajectory]] _add_horizontal_motions(self, Dict[float, List[Trajectory]] spanning_set)
bool _is_minimal_trajectory(self, Trajectory trajectory, index.Rtree prior_end_poses)
float _flip_angle(self, int angle, Flip flip_type)
FloatNDArray _get_wave_front_points(self, int pos)
List[int] _get_heading_discretization(self, int number_of_headings)
Dict[float, List[Trajectory]] _add_in_place_turns(self, Dict[float, List[Trajectory]] spanning_set)
float _compute_min_trajectory_length(self)
Dict[float, List[Trajectory]] _generate_minimal_spanning_set(self)
def __init__(self, ConfigDict config)
Dict[float, List[Trajectory]] _create_complete_minimal_spanning_set(self, Dict[int, List[Tuple[Any, int]]] single_quadrant_minimal_set)
AnyFloat _point_to_line_distance(self, FloatNDArray p1, FloatNDArray p2, FloatNDArray q)
Dict[float, List[Trajectory]] _handle_motion_model(self, Dict[float, List[Trajectory]] spanning_set)
Dict[float, List[Trajectory]] run(self)