15 from collections
import defaultdict
18 from helper
import angle_difference, interpolate_yaws
22 from rtree
import index
24 from trajectory
import Path, Trajectory, TrajectoryParameters
26 from trajectory_generator
import TrajectoryGenerator
31 Handles all the logic for computing the minimal control set.
33 Computes the minimal control set for a vehicle given its parameters.
34 Includes handling the propogating and searching along wavefronts as
35 well as determining if a trajectory is part of the minimal set based
36 on previously added trajectories.
40 """An Enum used for determining the motion model to use."""
47 """An Enum used for determining how a trajectory should be flipped."""
54 """Init the lattice generator from the user supplied config."""
68 def _get_wave_front_points(self, pos: int) -> np.array:
70 Calculate the end points that lie on the wave front.
72 Uses the user supplied grid resolution to calculate the
73 valid end points that lie on a wave front at a discrete
74 interval away from the origin.
78 The number of discrete intervals of grid resolution
79 away from the origin to generate the wave points at
84 An array of coordinates
95 positions.append((max_point_coord, varying_point_coord))
98 positions.append((varying_point_coord, max_point_coord))
101 positions.append((max_point_coord, max_point_coord))
103 return np.array(positions)
105 def _get_heading_discretization(self, number_of_headings: int) -> list:
107 Calculate the heading discretization based on the number of headings.
109 Does not uniformly generate headings but instead generates a set of
110 discrete headings that is better suited for straight line trajectories.
113 number_of_headings: int
114 The number of headings to discretize a 360 degree turn into
119 A list of headings in radians
122 max_val = int(number_of_headings / 8)
129 for i
in range(-max_val, max_val + 1):
130 outer_edge_x.extend([i, i])
131 outer_edge_y.extend([-max_val, max_val])
133 if i != max_val
and i != -max_val:
134 outer_edge_x.extend([-max_val, max_val])
135 outer_edge_y.extend([i, i])
137 return sorted([np.arctan2(j, i)
for i, j
in zip(outer_edge_x, outer_edge_y)])
139 def _point_to_line_distance(self, p1: np.array, p2: np.array, q: np.array) -> float:
141 Return the shortest distance from a point to a line segment.
145 Start point of line segment
147 End point of line segment
149 Point to get distance away from line of
154 The shortest distance between q and line segment p1p2
158 l2 = np.inner(p1 - p2, p1 - p2)
161 return np.linalg.norm(p1 - q)
164 t = max(0, min(1, np.dot(q - p1, p2 - p1) / l2))
165 projected_point = p1 + t * (p2 - p1)
167 return np.linalg.norm(q - projected_point)
169 def _is_minimal_trajectory(
170 self, trajectory: Trajectory, prior_end_poses: index.Rtree
173 Determine wheter a trajectory is a minimal trajectory.
175 Uses an RTree for speedup.
178 trajectory: Trajectory
179 The trajectory to check
180 prior_end_poses: RTree
181 An RTree holding the current minimal set of trajectories
186 True if the trajectory is a minimal trajectory otherwise false
190 for x1, y1, x2, y2, yaw
in zip(
191 trajectory.path.xs[:-1],
192 trajectory.path.ys[:-1],
193 trajectory.path.xs[1:],
194 trajectory.path.ys[1:],
195 trajectory.path.yaws[:-1],
198 p1 = np.array([x1, y1])
199 p2 = np.array([x2, y2])
212 for prior_end_pose
in prior_end_poses.intersection(
213 (left_bb, bottom_bb, right_bb, top_bb), objects=
'raw'
218 and angle_difference(yaw, prior_end_pose[-1])
225 def _compute_min_trajectory_length(self) -> float:
227 Compute the minimum trajectory length for the given parameters.
229 The minimum trajectory length is defined as the length needed
230 for the sharpest possible turn to move from 0 degrees to the next
231 discrete heading. Since the distance between headings is not uniform
232 we take the smallest possible difference.
237 The minimal length of a trajectory
244 for i
in range(len(self.
headingsheadings) - 1)
249 def _generate_minimal_spanning_set(self) -> dict:
251 Generate the minimal spanning set.
253 Iteratves over all possible trajectories and keeps only those that
254 are part of the minimal set.
259 A dictionary where the key is the start_angle and the value is
260 a list of trajectories that begin at that angle
263 quadrant1_end_poses = defaultdict(list)
267 initial_headings = sorted(
268 filter(
lambda x: 0 <= x
and x <= np.pi / 2, self.
headingsheadings)
273 wave_front_start_pos = int(
277 for start_heading
in initial_headings:
278 iterations_without_trajectory = 0
280 prior_end_poses = index.Index()
282 wave_front_cur_pos = wave_front_start_pos
286 target_headings = sorted(
287 self.
headingsheadings, key=
lambda x: (abs(x - start_heading), -x)
289 target_headings = list(
290 filter(
lambda x: abs(start_heading - x) <= np.pi / 2, target_headings)
294 iterations_without_trajectory += 1
299 for target_point
in positions:
300 for target_heading
in target_headings:
311 if trajectory
is not None:
317 new_end_pose = np.array(
318 [target_point[0], target_point[1], target_heading]
321 quadrant1_end_poses[start_heading].append(
322 (target_point, target_heading)
332 prior_end_poses.insert(
334 (left_bb, bottom_bb, right_bb, top_bb),
338 iterations_without_trajectory = 0
340 wave_front_cur_pos += 1
346 def _flip_angle(self, angle: float, flip_type: Flip) -> float:
348 Return the the appropriate flip of the angle in self.headings.
354 Whether to flip acrpss X axis, Y axis, or both
359 The angle in self.heading that is the appropriate flip
362 angle_idx = self.
headingsheadings.index(angle)
364 if flip_type == self.
FlipFlip.X:
365 heading_idx = (self.
num_of_headingsnum_of_headings / 2 - 1) - angle_idx - 1
366 elif flip_type == self.
FlipFlip.Y:
368 elif flip_type == self.
FlipFlip.BOTH:
373 raise Exception(f
'Unsupported flip type: {flip_type}')
375 return self.
headingsheadings[int(heading_idx)]
377 def _create_complete_minimal_spanning_set(
378 self, single_quadrant_minimal_set: dict
381 Create the full minimal spanning set from a single quadrant set.
383 Exploits the symmetry between the quadrants to create the full set.
384 This is done by flipping every trajectory in the first quadrant across
385 either the X-axis, Y-axis, or both axes.
388 single_quadrant_minimal_set: dict
389 The minimal set for quadrant 1 (positive x and positive y)
394 The complete minimal spanning set containing the trajectories
398 all_trajectories = defaultdict(list)
400 for start_angle
in single_quadrant_minimal_set.keys():
402 for end_point, end_angle
in single_quadrant_minimal_set[start_angle]:
408 if start_angle == 0
and end_angle == 0:
409 unflipped_start_angle = 0.0
410 flipped_x_start_angle = np.pi
412 unflipped_end_angle = 0.0
413 flipped_x_end_angle = np.pi
416 unflipped_trajectory = (
419 unflipped_start_angle,
424 flipped_x_trajectory = (
427 flipped_x_start_angle,
434 unflipped_trajectory.parameters.start_angle
435 ].append(unflipped_trajectory)
438 flipped_x_trajectory.parameters.start_angle
439 ].append(flipped_x_trajectory)
441 elif abs(start_angle) == np.pi / 2
and abs(end_angle) == np.pi / 2:
442 unflipped_start_angle = np.pi / 2
443 flipped_y_start_angle = -np.pi / 2
445 unflipped_end_angle = np.pi / 2
446 flipped_y_end_angle = -np.pi / 2
449 unflipped_trajectory = (
452 unflipped_start_angle,
458 flipped_y_trajectory = (
461 flipped_y_start_angle,
468 unflipped_trajectory.parameters.start_angle
469 ].append(unflipped_trajectory)
471 flipped_y_trajectory.parameters.start_angle
472 ].append(flipped_y_trajectory)
475 unflipped_start_angle = start_angle
476 flipped_x_start_angle = self.
_flip_angle_flip_angle(start_angle, self.
FlipFlip.X)
477 flipped_y_start_angle = self.
_flip_angle_flip_angle(start_angle, self.
FlipFlip.Y)
478 flipped_xy_start_angle = self.
_flip_angle_flip_angle(
479 start_angle, self.
FlipFlip.BOTH
482 unflipped_end_angle = end_angle
483 flipped_x_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.X)
484 flipped_y_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.Y)
485 flipped_xy_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.BOTH)
488 unflipped_trajectory = (
491 unflipped_start_angle,
496 flipped_x_trajectory = (
499 flipped_x_start_angle,
504 flipped_y_trajectory = (
507 flipped_y_start_angle,
512 flipped_xy_trajectory = (
515 flipped_xy_start_angle,
516 flipped_xy_end_angle,
522 unflipped_trajectory.parameters.start_angle
523 ].append(unflipped_trajectory)
525 flipped_x_trajectory.parameters.start_angle
526 ].append(flipped_x_trajectory)
528 flipped_y_trajectory.parameters.start_angle
529 ].append(flipped_y_trajectory)
531 flipped_xy_trajectory.parameters.start_angle
532 ].append(flipped_xy_trajectory)
534 return all_trajectories
536 def _handle_motion_model(self, spanning_set: dict) -> dict:
538 Add the appropriate motions for the user supplied motion model.
540 Ackerman: No additional trajectories
542 Diff: In place turns to the right and left
544 Omni: Diff + Sliding movements to right and left
548 The minimal spanning set
553 The minimal spanning set with additional trajectories based
562 return diff_spanning_set
567 return omni_spanning_set
570 print(
'No handling implemented for Motion Model: ' +
571 f
'{self.motion_model}')
572 raise NotImplementedError
574 def _add_in_place_turns(self, spanning_set: dict) -> dict:
576 Add in place turns to the spanning set.
578 In place turns are trajectories with only a rotational component and
579 only shift a single angular heading step
583 The minimal spanning set
588 The minimal spanning set containing additional in place turns
592 all_angles = sorted(spanning_set.keys())
594 for idx, start_angle
in enumerate(all_angles):
595 prev_angle_idx = idx - 1
if idx - 1 >= 0
else len(all_angles) - 1
596 next_angle_idx = idx + 1
if idx + 1 < len(all_angles)
else 0
598 prev_angle = all_angles[prev_angle_idx]
599 next_angle = all_angles[next_angle_idx]
601 left_turn_params = TrajectoryParameters.no_arc(
602 end_point=np.array([0, 0]),
603 start_angle=start_angle,
604 end_angle=next_angle,
606 right_turn_params = TrajectoryParameters.no_arc(
607 end_point=np.array([0, 0]),
608 start_angle=start_angle,
609 end_angle=prev_angle,
614 angle_dif = angle_difference(start_angle, next_angle)
615 steps = int(round(angle_dif / np.deg2rad(10))) + 1
617 position = np.full(steps, 0)
618 left_yaws = interpolate_yaws(start_angle, next_angle,
True, steps)
619 right_yaws = interpolate_yaws(start_angle, prev_angle,
False, steps)
621 left_turn_path =
Path(xs=position, ys=position, yaws=left_yaws)
622 right_turn_path =
Path(xs=position, ys=position, yaws=right_yaws)
624 left_turn =
Trajectory(parameters=left_turn_params, path=left_turn_path)
625 right_turn =
Trajectory(parameters=right_turn_params, path=right_turn_path)
627 spanning_set[start_angle].append(left_turn)
628 spanning_set[start_angle].append(right_turn)
632 def _add_horizontal_motions(self, spanning_set: dict) -> dict:
634 Add horizontal sliding motions to the spanning set.
636 The horizontal sliding motions are simply straight line trajectories
637 at 90 degrees to every start angle in the spanning set. The yaw of these
638 trajectories is the same as the start angle for which it is generated.
642 The minimal spanning set
647 The minimal spanning set containing additional sliding motions
655 for idx, angle
in enumerate(self.
headingsheadings):
659 left_angle_idx = int((idx + idx_offset) % self.
num_of_headingsnum_of_headings)
660 left_angle = self.
headingsheadings[left_angle_idx]
661 left_trajectories = spanning_set[left_angle]
662 left_straight_trajectory = next(
663 t
for t
in left_trajectories
if t.parameters.end_angle == left_angle
668 right_angle_idx = int((idx - idx_offset) % self.
num_of_headingsnum_of_headings)
669 right_angle = self.
headingsheadings[right_angle_idx]
670 right_trajectories = spanning_set[right_angle]
671 right_straight_trajectory = next(
672 t
for t
in right_trajectories
if t.parameters.end_angle == right_angle
676 len(left_straight_trajectory.path.xs), angle, dtype=np.float64
681 parmas_l = left_straight_trajectory.parameters
683 parmas_l.turning_radius,
690 parmas_l.arc_start_point,
691 parmas_l.arc_end_point,
696 params_r = right_straight_trajectory.parameters
698 params_r.turning_radius,
705 parmas_l.arc_start_point,
706 parmas_l.arc_end_point,
710 parameters=left_motion_parameters,
712 xs=left_straight_trajectory.path.xs,
713 ys=left_straight_trajectory.path.ys,
719 parameters=right_motion_parameters,
721 xs=right_straight_trajectory.path.xs,
722 ys=right_straight_trajectory.path.ys,
727 spanning_set[angle].append(left_motion)
728 spanning_set[angle].append(right_motion)
734 Run the lattice generator.
739 The minimal spanning set including additional motions for the
740 specified motion model
dict _handle_motion_model(self, dict spanning_set)
float _compute_min_trajectory_length(self)
dict _create_complete_minimal_spanning_set(self, dict single_quadrant_minimal_set)
dict _add_horizontal_motions(self, dict spanning_set)
dict _add_in_place_turns(self, dict spanning_set)
float _point_to_line_distance(self, np.array p1, np.array p2, np.array q)
np.array _get_wave_front_points(self, int pos)
bool _is_minimal_trajectory(self, Trajectory trajectory, index.Rtree prior_end_poses)
float _flip_angle(self, float angle, Flip flip_type)
dict _generate_minimal_spanning_set(self)
def __init__(self, dict config)
list _get_heading_discretization(self, int number_of_headings)