15 from collections
import defaultdict
18 from helper
import angle_difference, interpolate_yaws
22 from rtree
import index
24 from trajectory
import Path, Trajectory, TrajectoryParameters
26 from trajectory_generator
import TrajectoryGenerator
31 Handles all the logic for computing the minimal control set.
33 Computes the minimal control set for a vehicle given its parameters.
34 Includes handling the propogating and searching along wavefronts as
35 well as determining if a trajectory is part of the minimal set based
36 on previously added trajectories.
40 """An Enum used for determining the motion model to use."""
47 """An Enum used for determining how a trajectory should be flipped."""
54 """Init the lattice generator from the user supplied config."""
67 def _get_wave_front_points(self, pos: int) -> np.array:
69 Calculate the end points that lie on the wave front.
71 Uses the user supplied grid resolution to calculate the
72 valid end points that lie on a wave front at a discrete
73 interval away from the origin.
77 The number of discrete intervals of grid resolution
78 away from the origin to generate the wave points at
83 An array of coordinates
94 positions.append((max_point_coord, varying_point_coord))
97 positions.append((varying_point_coord, max_point_coord))
100 positions.append((max_point_coord, max_point_coord))
102 return np.array(positions)
104 def _get_heading_discretization(self, number_of_headings: int) -> list:
106 Calculate the heading discretization based on the number of headings.
108 Does not uniformly generate headings but instead generates a set of
109 discrete headings that is better suited for straight line trajectories.
112 number_of_headings: int
113 The number of headings to discretize a 360 degree turn into
118 A list of headings in radians
121 max_val = int(number_of_headings / 8)
128 for i
in range(-max_val, max_val + 1):
129 outer_edge_x.extend([i, i])
130 outer_edge_y.extend([-max_val, max_val])
132 if i != max_val
and i != -max_val:
133 outer_edge_x.extend([-max_val, max_val])
134 outer_edge_y.extend([i, i])
136 return sorted([np.arctan2(j, i)
for i, j
in zip(outer_edge_x, outer_edge_y)])
138 def _point_to_line_distance(self, p1: np.array, p2: np.array, q: np.array) -> float:
140 Return the shortest distance from a point to a line segment.
144 Start point of line segment
146 End point of line segment
148 Point to get distance away from line of
153 The shortest distance between q and line segment p1p2
157 l2 = np.inner(p1 - p2, p1 - p2)
160 return np.linalg.norm(p1 - q)
163 t = max(0, min(1, np.dot(q - p1, p2 - p1) / l2))
164 projected_point = p1 + t * (p2 - p1)
166 return np.linalg.norm(q - projected_point)
168 def _is_minimal_trajectory(
169 self, trajectory: Trajectory, prior_end_poses: index.Rtree
172 Determine wheter a trajectory is a minimal trajectory.
174 Uses an RTree for speedup.
177 trajectory: Trajectory
178 The trajectory to check
179 prior_end_poses: RTree
180 An RTree holding the current minimal set of trajectories
185 True if the trajectory is a minimal trajectory otherwise false
189 for x1, y1, x2, y2, yaw
in zip(
190 trajectory.path.xs[:-1],
191 trajectory.path.ys[:-1],
192 trajectory.path.xs[1:],
193 trajectory.path.ys[1:],
194 trajectory.path.yaws[:-1],
197 p1 = np.array([x1, y1])
198 p2 = np.array([x2, y2])
211 for prior_end_pose
in prior_end_poses.intersection(
212 (left_bb, bottom_bb, right_bb, top_bb), objects=
'raw'
217 and angle_difference(yaw, prior_end_pose[-1])
224 def _compute_min_trajectory_length(self) -> float:
226 Compute the minimum trajectory length for the given parameters.
228 The minimum trajectory length is defined as the length needed
229 for the sharpest possible turn to move from 0 degrees to the next
230 discrete heading. Since the distance between headings is not uniform
231 we take the smallest possible difference.
236 The minimal length of a trajectory
243 for i
in range(len(self.
headingsheadings) - 1)
248 def _generate_minimal_spanning_set(self) -> dict:
250 Generate the minimal spanning set.
252 Iteratves over all possible trajectories and keeps only those that
253 are part of the minimal set.
258 A dictionary where the key is the start_angle and the value is
259 a list of trajectories that begin at that angle
262 quadrant1_end_poses = defaultdict(list)
266 initial_headings = sorted(
267 filter(
lambda x: 0 <= x
and x <= np.pi / 2, self.
headingsheadings)
272 wave_front_start_pos = int(
276 for start_heading
in initial_headings:
277 iterations_without_trajectory = 0
279 prior_end_poses = index.Index()
281 wave_front_cur_pos = wave_front_start_pos
285 target_headings = sorted(
286 self.
headingsheadings, key=
lambda x: (abs(x - start_heading), -x)
288 target_headings = list(
289 filter(
lambda x: abs(start_heading - x) <= np.pi / 2, target_headings)
293 iterations_without_trajectory += 1
298 for target_point
in positions:
299 for target_heading
in target_headings:
310 if trajectory
is not None:
316 new_end_pose = np.array(
317 [target_point[0], target_point[1], target_heading]
320 quadrant1_end_poses[start_heading].append(
321 (target_point, target_heading)
331 prior_end_poses.insert(
333 (left_bb, bottom_bb, right_bb, top_bb),
337 iterations_without_trajectory = 0
339 wave_front_cur_pos += 1
345 def _flip_angle(self, angle: float, flip_type: Flip) -> float:
347 Return the the appropriate flip of the angle in self.headings.
353 Whether to flip acrpss X axis, Y axis, or both
358 The angle in self.heading that is the appropriate flip
361 angle_idx = self.
headingsheadings.index(angle)
363 if flip_type == self.
FlipFlip.X:
364 heading_idx = (self.
num_of_headingsnum_of_headings / 2 - 1) - angle_idx - 1
365 elif flip_type == self.
FlipFlip.Y:
367 elif flip_type == self.
FlipFlip.BOTH:
372 raise Exception(f
'Unsupported flip type: {flip_type}')
374 return self.
headingsheadings[int(heading_idx)]
376 def _create_complete_minimal_spanning_set(
377 self, single_quadrant_minimal_set: dict
380 Create the full minimal spanning set from a single quadrant set.
382 Exploits the symmetry between the quadrants to create the full set.
383 This is done by flipping every trajectory in the first quadrant across
384 either the X-axis, Y-axis, or both axes.
387 single_quadrant_minimal_set: dict
388 The minimal set for quadrant 1 (positive x and positive y)
393 The complete minimal spanning set containing the trajectories
397 all_trajectories = defaultdict(list)
399 for start_angle
in single_quadrant_minimal_set.keys():
401 for end_point, end_angle
in single_quadrant_minimal_set[start_angle]:
407 if start_angle == 0
and end_angle == 0:
408 unflipped_start_angle = 0.0
409 flipped_x_start_angle = np.pi
411 unflipped_end_angle = 0.0
412 flipped_x_end_angle = np.pi
415 unflipped_trajectory = (
418 unflipped_start_angle,
423 flipped_x_trajectory = (
426 flipped_x_start_angle,
433 unflipped_trajectory.parameters.start_angle
434 ].append(unflipped_trajectory)
437 flipped_x_trajectory.parameters.start_angle
438 ].append(flipped_x_trajectory)
440 elif abs(start_angle) == np.pi / 2
and abs(end_angle) == np.pi / 2:
441 unflipped_start_angle = np.pi / 2
442 flipped_y_start_angle = -np.pi / 2
444 unflipped_end_angle = np.pi / 2
445 flipped_y_end_angle = -np.pi / 2
448 unflipped_trajectory = (
451 unflipped_start_angle,
457 flipped_y_trajectory = (
460 flipped_y_start_angle,
467 unflipped_trajectory.parameters.start_angle
468 ].append(unflipped_trajectory)
470 flipped_y_trajectory.parameters.start_angle
471 ].append(flipped_y_trajectory)
474 unflipped_start_angle = start_angle
475 flipped_x_start_angle = self.
_flip_angle_flip_angle(start_angle, self.
FlipFlip.X)
476 flipped_y_start_angle = self.
_flip_angle_flip_angle(start_angle, self.
FlipFlip.Y)
477 flipped_xy_start_angle = self.
_flip_angle_flip_angle(
478 start_angle, self.
FlipFlip.BOTH
481 unflipped_end_angle = end_angle
482 flipped_x_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.X)
483 flipped_y_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.Y)
484 flipped_xy_end_angle = self.
_flip_angle_flip_angle(end_angle, self.
FlipFlip.BOTH)
487 unflipped_trajectory = (
490 unflipped_start_angle,
495 flipped_x_trajectory = (
498 flipped_x_start_angle,
503 flipped_y_trajectory = (
506 flipped_y_start_angle,
511 flipped_xy_trajectory = (
514 flipped_xy_start_angle,
515 flipped_xy_end_angle,
521 unflipped_trajectory.parameters.start_angle
522 ].append(unflipped_trajectory)
524 flipped_x_trajectory.parameters.start_angle
525 ].append(flipped_x_trajectory)
527 flipped_y_trajectory.parameters.start_angle
528 ].append(flipped_y_trajectory)
530 flipped_xy_trajectory.parameters.start_angle
531 ].append(flipped_xy_trajectory)
533 return all_trajectories
535 def _handle_motion_model(self, spanning_set: dict) -> dict:
537 Add the appropriate motions for the user supplied motion model.
539 Ackerman: No additional trajectories
541 Diff: In place turns to the right and left
543 Omni: Diff + Sliding movements to right and left
547 The minimal spanning set
552 The minimal spanning set with additional trajectories based
561 return diff_spanning_set
566 return omni_spanning_set
569 print(
'No handling implemented for Motion Model: ' + f
'{self.motion_model}')
570 raise NotImplementedError
572 def _add_in_place_turns(self, spanning_set: dict) -> dict:
574 Add in place turns to the spanning set.
576 In place turns are trajectories with only a rotational component and
577 only shift a single angular heading step
581 The minimal spanning set
586 The minimal spanning set containing additional in place turns
590 all_angles = sorted(spanning_set.keys())
592 for idx, start_angle
in enumerate(all_angles):
593 prev_angle_idx = idx - 1
if idx - 1 >= 0
else len(all_angles) - 1
594 next_angle_idx = idx + 1
if idx + 1 < len(all_angles)
else 0
596 prev_angle = all_angles[prev_angle_idx]
597 next_angle = all_angles[next_angle_idx]
599 left_turn_params = TrajectoryParameters.no_arc(
600 end_point=np.array([0, 0]),
601 start_angle=start_angle,
602 end_angle=next_angle,
604 right_turn_params = TrajectoryParameters.no_arc(
605 end_point=np.array([0, 0]),
606 start_angle=start_angle,
607 end_angle=prev_angle,
612 angle_dif = angle_difference(start_angle, next_angle)
613 steps = int(round(angle_dif / np.deg2rad(10))) + 1
615 position = np.full(steps, 0)
616 left_yaws = interpolate_yaws(start_angle, next_angle,
True, steps)
617 right_yaws = interpolate_yaws(start_angle, prev_angle,
False, steps)
619 left_turn_path =
Path(xs=position, ys=position, yaws=left_yaws)
620 right_turn_path =
Path(xs=position, ys=position, yaws=right_yaws)
622 left_turn =
Trajectory(parameters=left_turn_params, path=left_turn_path)
623 right_turn =
Trajectory(parameters=right_turn_params, path=right_turn_path)
625 spanning_set[start_angle].append(left_turn)
626 spanning_set[start_angle].append(right_turn)
630 def _add_horizontal_motions(self, spanning_set: dict) -> dict:
632 Add horizontal sliding motions to the spanning set.
634 The horizontal sliding motions are simply straight line trajectories
635 at 90 degrees to every start angle in the spanning set. The yaw of these
636 trajectories is the same as the start angle for which it is generated.
640 The minimal spanning set
645 The minimal spanning set containing additional sliding motions
653 for idx, angle
in enumerate(self.
headingsheadings):
657 left_angle_idx = int((idx + idx_offset) % self.
num_of_headingsnum_of_headings)
658 left_angle = self.
headingsheadings[left_angle_idx]
659 left_trajectories = spanning_set[left_angle]
660 left_straight_trajectory = next(
661 t
for t
in left_trajectories
if t.parameters.end_angle == left_angle
666 right_angle_idx = int((idx - idx_offset) % self.
num_of_headingsnum_of_headings)
667 right_angle = self.
headingsheadings[right_angle_idx]
668 right_trajectories = spanning_set[right_angle]
669 right_straight_trajectory = next(
670 t
for t
in right_trajectories
if t.parameters.end_angle == right_angle
674 len(left_straight_trajectory.path.xs), angle, dtype=np.float64
679 parmas_l = left_straight_trajectory.parameters
681 parmas_l.turning_radius,
688 parmas_l.arc_start_point,
689 parmas_l.arc_end_point,
694 params_r = right_straight_trajectory.parameters
696 params_r.turning_radius,
703 parmas_l.arc_start_point,
704 parmas_l.arc_end_point,
708 parameters=left_motion_parameters,
710 xs=left_straight_trajectory.path.xs,
711 ys=left_straight_trajectory.path.ys,
717 parameters=right_motion_parameters,
719 xs=right_straight_trajectory.path.xs,
720 ys=right_straight_trajectory.path.ys,
725 spanning_set[angle].append(left_motion)
726 spanning_set[angle].append(right_motion)
732 Run the lattice generator.
737 The minimal spanning set including additional motions for the
738 specified motion model
dict _handle_motion_model(self, dict spanning_set)
float _compute_min_trajectory_length(self)
dict _create_complete_minimal_spanning_set(self, dict single_quadrant_minimal_set)
dict _add_horizontal_motions(self, dict spanning_set)
dict _add_in_place_turns(self, dict spanning_set)
float _point_to_line_distance(self, np.array p1, np.array p2, np.array q)
np.array _get_wave_front_points(self, int pos)
bool _is_minimal_trajectory(self, Trajectory trajectory, index.Rtree prior_end_poses)
float _flip_angle(self, float angle, Flip flip_type)
dict _generate_minimal_spanning_set(self)
def __init__(self, dict config)
list _get_heading_discretization(self, int number_of_headings)