Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
rewritten_yaml.py
1 # Copyright (c) 2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from collections.abc import Generator
16 import tempfile
17 from typing import Optional, TypeAlias, Union
18 
19 import launch
20 import yaml
21 
22 YamlValue: TypeAlias = Union[str, int, float, bool]
23 
24 
26 
27  def __init__(self, dictionary: dict[str, YamlValue], key: str):
28  self.dictionarydictionary = dictionary
29  self.dictKeydictKey = key
30 
31  def key(self) -> str:
32  return self.dictKeydictKey
33 
34  def setValue(self, value: YamlValue) -> None:
35  self.dictionarydictionary[self.dictKeydictKey] = value
36 
37 
38 class RewrittenYaml(launch.Substitution):
39  """
40  Substitution that modifies the given YAML file.
41 
42  Used in launch system
43  """
44 
45  def __init__(
46  self,
47  source_file: launch.SomeSubstitutionsType,
48  param_rewrites: dict[str, launch.SomeSubstitutionsType],
49  root_key: Optional[launch.SomeSubstitutionsType] = None,
50  key_rewrites: Optional[dict[str, launch.SomeSubstitutionsType]] = None,
51  value_rewrites: Optional[dict[str, launch.SomeSubstitutionsType]] = None,
52  convert_types: bool = False,
53  ) -> None:
54  super().__init__()
55  """
56  Construct the substitution
57 
58  :param: source_file the original YAML file to modify
59  :param: param_rewrites mappings to replace
60  :param: root_key if provided, the contents are placed under this key
61  :param: key_rewrites keys of mappings to replace
62  :param: value_rewrites values to replace
63  :param: convert_types whether to attempt converting the string to a number or boolean
64  """
65 
66  # import here to avoid loop
67  from launch.utilities import normalize_to_list_of_substitutions
68 
69  self.__source_file: list[launch.Substitution] = \
70  normalize_to_list_of_substitutions(source_file)
71  self.__param_rewrites__param_rewrites = {}
72  self.__key_rewrites__key_rewrites = {}
73  self.__value_rewrites__value_rewrites = {}
74  self.__convert_types__convert_types = convert_types
75  self.__root_key__root_key = None
76  for key in param_rewrites:
77  self.__param_rewrites__param_rewrites[key] = normalize_to_list_of_substitutions(
78  param_rewrites[key]
79  )
80  if key_rewrites is not None:
81  for key in key_rewrites:
82  self.__key_rewrites__key_rewrites[key] = normalize_to_list_of_substitutions(
83  key_rewrites[key]
84  )
85  if value_rewrites is not None:
86  for value in value_rewrites:
87  self.__value_rewrites__value_rewrites[value] = normalize_to_list_of_substitutions(
88  value_rewrites[value]
89  )
90  if root_key is not None:
91  self.__root_key__root_key = normalize_to_list_of_substitutions(root_key)
92 
93  @property
94  def name(self) -> list[launch.Substitution]:
95  """Getter for name."""
96  return self.__source_file
97 
98  def describe(self) -> str:
99  """Return a description of this substitution as a string."""
100  return ''
101 
102  def perform(self, context: launch.LaunchContext) -> str:
103  yaml_filename = launch.utilities.perform_substitutions(context, self.namename)
104  rewritten_yaml = tempfile.NamedTemporaryFile(mode='w', delete=False)
105  param_rewrites, keys_rewrites, value_rewrites = self.resolve_rewritesresolve_rewrites(context)
106 
107  with open(yaml_filename, 'r') as yaml_file:
108  data = yaml.safe_load(yaml_file)
109 
110  self.substitute_paramssubstitute_params(data, param_rewrites)
111  self.add_paramsadd_params(data, param_rewrites)
112  self.substitute_keyssubstitute_keys(data, keys_rewrites)
113  self.substitute_valuessubstitute_values(data, value_rewrites)
114  if self.__root_key__root_key is not None:
115  root_key = launch.utilities.perform_substitutions(context, self.__root_key__root_key)
116  if root_key:
117  data = {root_key: data}
118  yaml.dump(data, rewritten_yaml)
119  rewritten_yaml.close()
120  return rewritten_yaml.name
121 
122  def resolve_rewrites(self, context: launch.LaunchContext) -> \
123  tuple[dict[str, str], dict[str, str], dict[str, str]]:
124  resolved_params = {}
125  for key in self.__param_rewrites__param_rewrites:
126  resolved_params[key] = launch.utilities.perform_substitutions(
127  context, self.__param_rewrites__param_rewrites[key]
128  )
129  resolved_keys = {}
130  for key in self.__key_rewrites__key_rewrites:
131  resolved_keys[key] = launch.utilities.perform_substitutions(
132  context, self.__key_rewrites__key_rewrites[key]
133  )
134  resolved_values = {}
135  for value in self.__value_rewrites__value_rewrites:
136  resolved_values[value] = launch.utilities.perform_substitutions(
137  context, self.__value_rewrites__value_rewrites[value]
138  )
139  return resolved_params, resolved_keys, resolved_values
140 
141  def substitute_params(self, yaml: dict[str, YamlValue],
142  param_rewrites: dict[str, str]) -> None:
143  # substitute leaf-only parameters
144  for key in self.getYamlLeafKeysgetYamlLeafKeys(yaml):
145  if key.key() in param_rewrites:
146  raw_value = param_rewrites[key.key()]
147  key.setValue(self.convertconvert(raw_value))
148 
149  # substitute total path parameters
150  yaml_paths = self.pathifypathify(yaml)
151  for path in yaml_paths:
152  if path in param_rewrites:
153  # this is an absolute path (ex. 'key.keyA.keyB.val')
154  rewrite_val = self.convertconvert(param_rewrites[path])
155  yaml_keys = path.split('.')
156  yaml = self.updateYamlPathValsupdateYamlPathVals(yaml, yaml_keys, rewrite_val)
157 
158  def add_params(self, yaml: dict[str, YamlValue],
159  param_rewrites: dict[str, str]) -> None:
160  # add new total path parameters
161  yaml_paths = self.pathifypathify(yaml)
162  for path in param_rewrites:
163  if not path in yaml_paths: # noqa: E713
164  new_val = self.convertconvert(param_rewrites[path])
165  yaml_keys = path.split('.')
166  if 'ros__parameters' in yaml_keys:
167  yaml = self.updateYamlPathValsupdateYamlPathVals(yaml, yaml_keys, new_val)
168 
169  def substitute_values(
170  self, yaml: dict[str, YamlValue],
171  value_rewrites: dict[str, str]) -> None:
172 
173  def process_value(value: YamlValue) -> YamlValue:
174  if isinstance(value, dict):
175  for k, v in list(value.items()):
176  value[k] = process_value(v)
177  return value
178  elif isinstance(value, list):
179  return [process_value(v) for v in value]
180  elif str(value) in value_rewrites:
181  return self.convertconvert(value_rewrites[str(value)])
182  return value
183 
184  for key in list(yaml.keys()):
185  yaml[key] = process_value(yaml[key])
186 
187  def updateYamlPathVals(
188  self, yaml: dict[str, YamlValue],
189  yaml_key_list: list[str], rewrite_val: YamlValue) -> dict[str, YamlValue]:
190 
191  for key in yaml_key_list:
192  if key == yaml_key_list[-1]:
193  yaml[key] = rewrite_val
194  break
195  key = yaml_key_list.pop(0)
196  if isinstance(yaml, list):
197  yaml[int(key)] = self.updateYamlPathValsupdateYamlPathVals(
198  yaml[int(key)], yaml_key_list, rewrite_val
199  )
200  else:
201  yaml[key] = self.updateYamlPathValsupdateYamlPathVals( # type: ignore[assignment]
202  yaml.get(key, {}), # type: ignore[arg-type]
203  yaml_key_list,
204  rewrite_val
205  )
206  return yaml
207 
208  def substitute_keys(
209  self, yaml: dict[str, YamlValue], key_rewrites: dict[str, str]) -> None:
210  if len(key_rewrites) != 0:
211  for key in list(yaml.keys()):
212  val = yaml[key]
213  if key in key_rewrites:
214  new_key = key_rewrites[key]
215  yaml[new_key] = yaml[key]
216  del yaml[key]
217  if isinstance(val, dict):
218  self.substitute_keyssubstitute_keys(val, key_rewrites)
219 
220  def getYamlLeafKeys(self, yamlData: dict[str, YamlValue]) -> \
221  Generator[DictItemReference, None, None]:
222  if not isinstance(yamlData, dict):
223  return
224 
225  for key in yamlData.keys():
226  child = yamlData[key]
227 
228  if isinstance(child, dict):
229  # Recursively process nested dictionaries
230  yield from self.getYamlLeafKeysgetYamlLeafKeys(child)
231 
232  yield DictItemReference(yamlData, key)
233 
234  def pathify(
235  self, d: Union[dict[str, YamlValue], list[YamlValue], YamlValue],
236  p: Optional[str] = None,
237  paths: Optional[dict[str, YamlValue]] = None,
238  joinchar: str = '.') -> dict[str, YamlValue]:
239  if p is None:
240  paths = {}
241  self.pathifypathify(d, '', paths, joinchar=joinchar)
242  return paths
243 
244  assert paths is not None
245  pn = p
246  if p != '':
247  pn += joinchar
248  if isinstance(d, dict):
249  for k in d:
250  v = d[k]
251  self.pathifypathify(v, str(pn) + str(k), paths, joinchar=joinchar)
252  elif isinstance(d, list):
253  for idx, e in enumerate(d):
254  self.pathifypathify(e, pn + str(idx), paths, joinchar=joinchar)
255  else:
256  paths[p] = d
257  return paths
258 
259  def convert(self, text_value: str) -> YamlValue:
260  if self.__convert_types__convert_types:
261  # try converting to int or float
262  try:
263  return float(text_value) if '.' in text_value else int(text_value)
264  except ValueError:
265  pass
266 
267  # try converting to bool
268  if text_value.lower() == 'true':
269  return True
270  if text_value.lower() == 'false':
271  return False
272 
273  # nothing else worked so fall through and return text
274  return text_value
\ tuple[dict[str, str], dict[str, str], dict[str, str]] resolve_rewrites(self, launch.LaunchContext context)
\ Generator[DictItemReference, None, None] getYamlLeafKeys(self, dict[str, YamlValue] yamlData)
list[launch.Substitution] name(self)
None add_params(self, dict[str, YamlValue] yaml, dict[str, str] param_rewrites)
None substitute_params(self, dict[str, YamlValue] yaml, dict[str, str] param_rewrites)
YamlValue convert(self, str text_value)
None substitute_keys(self, dict[str, YamlValue] yaml, dict[str, str] key_rewrites)
dict[str, YamlValue] pathify(self, Union[dict[str, YamlValue], list[YamlValue], YamlValue] d, Optional[str] p=None, Optional[dict[str, YamlValue]] paths=None, str joinchar='.')
dict[str, YamlValue] updateYamlPathVals(self, dict[str, YamlValue] yaml, list[str] yaml_key_list, YamlValue rewrite_val)
None substitute_values(self, dict[str, YamlValue] yaml, dict[str, str] value_rewrites)