15 from collections.abc
import Generator
17 from typing
import Optional, TypeAlias, Union
22 YamlValue: TypeAlias = Union[str, int, float, bool]
27 def __init__(self, dictionary: dict[str, YamlValue], key: str):
34 def setValue(self, value: YamlValue) ->
None:
40 Substitution that modifies the given YAML file.
47 source_file: launch.SomeSubstitutionsType,
48 param_rewrites: dict[str, launch.SomeSubstitutionsType],
49 root_key: Optional[launch.SomeSubstitutionsType] =
None,
50 key_rewrites: Optional[dict[str, launch.SomeSubstitutionsType]] =
None,
51 value_rewrites: Optional[dict[str, launch.SomeSubstitutionsType]] =
None,
52 convert_types: bool =
False,
56 Construct the substitution
58 :param: source_file the original YAML file to modify
59 :param: param_rewrites mappings to replace
60 :param: root_key if provided, the contents are placed under this key
61 :param: key_rewrites keys of mappings to replace
62 :param: value_rewrites values to replace
63 :param: convert_types whether to attempt converting the string to a number or boolean
67 from launch.utilities
import normalize_to_list_of_substitutions
69 self.__source_file: list[launch.Substitution] = \
70 normalize_to_list_of_substitutions(source_file)
76 for key
in param_rewrites:
77 self.
__param_rewrites__param_rewrites[key] = normalize_to_list_of_substitutions(
80 if key_rewrites
is not None:
81 for key
in key_rewrites:
82 self.
__key_rewrites__key_rewrites[key] = normalize_to_list_of_substitutions(
85 if value_rewrites
is not None:
86 for value
in value_rewrites:
87 self.
__value_rewrites__value_rewrites[value] = normalize_to_list_of_substitutions(
90 if root_key
is not None:
91 self.
__root_key__root_key = normalize_to_list_of_substitutions(root_key)
94 def name(self) -> list[launch.Substitution]:
95 """Getter for name."""
96 return self.__source_file
99 """Return a description of this substitution as a string."""
102 def perform(self, context: launch.LaunchContext) -> str:
103 yaml_filename = launch.utilities.perform_substitutions(context, self.
namename)
104 rewritten_yaml = tempfile.NamedTemporaryFile(mode=
'w', delete=
False)
105 param_rewrites, keys_rewrites, value_rewrites = self.
resolve_rewritesresolve_rewrites(context)
107 with open(yaml_filename,
'r')
as yaml_file:
108 data = yaml.safe_load(yaml_file)
111 self.
add_paramsadd_params(data, param_rewrites)
115 root_key = launch.utilities.perform_substitutions(context, self.
__root_key__root_key)
117 data = {root_key: data}
118 yaml.dump(data, rewritten_yaml)
119 rewritten_yaml.close()
120 return rewritten_yaml.name
122 def resolve_rewrites(self, context: launch.LaunchContext) -> \
123 tuple[dict[str, str], dict[str, str], dict[str, str]]:
126 resolved_params[key] = launch.utilities.perform_substitutions(
131 resolved_keys[key] = launch.utilities.perform_substitutions(
136 resolved_values[value] = launch.utilities.perform_substitutions(
139 return resolved_params, resolved_keys, resolved_values
141 def substitute_params(self, yaml: dict[str, YamlValue],
142 param_rewrites: dict[str, str]) ->
None:
145 if key.key()
in param_rewrites:
146 raw_value = param_rewrites[key.key()]
147 key.setValue(self.
convertconvert(raw_value))
150 yaml_paths = self.
pathifypathify(yaml)
151 for path
in yaml_paths:
152 if path
in param_rewrites:
154 rewrite_val = self.
convertconvert(param_rewrites[path])
155 yaml_keys = path.split(
'.')
158 def add_params(self, yaml: dict[str, YamlValue],
159 param_rewrites: dict[str, str]) ->
None:
161 yaml_paths = self.
pathifypathify(yaml)
162 for path
in param_rewrites:
163 if not path
in yaml_paths:
164 new_val = self.
convertconvert(param_rewrites[path])
165 yaml_keys = path.split(
'.')
166 if 'ros__parameters' in yaml_keys:
169 def substitute_values(
170 self, yaml: dict[str, YamlValue],
171 value_rewrites: dict[str, str]) ->
None:
173 def process_value(value: YamlValue) -> YamlValue:
174 if isinstance(value, dict):
175 for k, v
in list(value.items()):
176 value[k] = process_value(v)
178 elif isinstance(value, list):
179 return [process_value(v)
for v
in value]
180 elif str(value)
in value_rewrites:
181 return self.
convertconvert(value_rewrites[str(value)])
184 for key
in list(yaml.keys()):
185 yaml[key] = process_value(yaml[key])
187 def updateYamlPathVals(
188 self, yaml: dict[str, YamlValue],
189 yaml_key_list: list[str], rewrite_val: YamlValue) -> dict[str, YamlValue]:
191 for key
in yaml_key_list:
192 if key == yaml_key_list[-1]:
193 yaml[key] = rewrite_val
195 key = yaml_key_list.pop(0)
196 if isinstance(yaml, list):
198 yaml[int(key)], yaml_key_list, rewrite_val
209 self, yaml: dict[str, YamlValue], key_rewrites: dict[str, str]) ->
None:
210 if len(key_rewrites) != 0:
211 for key
in list(yaml.keys()):
213 if key
in key_rewrites:
214 new_key = key_rewrites[key]
215 yaml[new_key] = yaml[key]
217 if isinstance(val, dict):
220 def getYamlLeafKeys(self, yamlData: dict[str, YamlValue]) -> \
221 Generator[DictItemReference,
None,
None]:
222 if not isinstance(yamlData, dict):
225 for key
in yamlData.keys():
226 child = yamlData[key]
228 if isinstance(child, dict):
235 self, d: Union[dict[str, YamlValue], list[YamlValue], YamlValue],
236 p: Optional[str] =
None,
237 paths: Optional[dict[str, YamlValue]] =
None,
238 joinchar: str =
'.') -> dict[str, YamlValue]:
241 self.
pathifypathify(d,
'', paths, joinchar=joinchar)
244 assert paths
is not None
248 if isinstance(d, dict):
251 self.
pathifypathify(v, str(pn) + str(k), paths, joinchar=joinchar)
252 elif isinstance(d, list):
253 for idx, e
in enumerate(d):
254 self.
pathifypathify(e, pn + str(idx), paths, joinchar=joinchar)
259 def convert(self, text_value: str) -> YamlValue:
263 return float(text_value)
if '.' in text_value
else int(text_value)
268 if text_value.lower() ==
'true':
270 if text_value.lower() ==
'false':
\ tuple[dict[str, str], dict[str, str], dict[str, str]] resolve_rewrites(self, launch.LaunchContext context)
\ Generator[DictItemReference, None, None] getYamlLeafKeys(self, dict[str, YamlValue] yamlData)
list[launch.Substitution] name(self)
None add_params(self, dict[str, YamlValue] yaml, dict[str, str] param_rewrites)
None substitute_params(self, dict[str, YamlValue] yaml, dict[str, str] param_rewrites)
YamlValue convert(self, str text_value)
None substitute_keys(self, dict[str, YamlValue] yaml, dict[str, str] key_rewrites)
dict[str, YamlValue] pathify(self, Union[dict[str, YamlValue], list[YamlValue], YamlValue] d, Optional[str] p=None, Optional[dict[str, YamlValue]] paths=None, str joinchar='.')
dict[str, YamlValue] updateYamlPathVals(self, dict[str, YamlValue] yaml, list[str] yaml_key_list, YamlValue rewrite_val)
None substitute_values(self, dict[str, YamlValue] yaml, dict[str, str] value_rewrites)