15 from typing
import Dict
16 from typing
import List
17 from typing
import Text
18 from typing
import Optional
26 def __init__(self, dictionary, key):
33 def setValue(self, value):
39 Substitution that modifies the given YAML file.
45 source_file: launch.SomeSubstitutionsType,
47 root_key: Optional[launch.SomeSubstitutionsType] =
None,
48 key_rewrites: Optional[Dict] =
None,
49 convert_types =
False) ->
None:
52 Construct the substitution
54 :param: source_file the original YAML file to modify
55 :param: param_rewrites mappings to replace
56 :param: root_key if provided, the contents are placed under this key
57 :param: key_rewrites keys of mappings to replace
58 :param: convert_types whether to attempt converting the string to a number or boolean
61 from launch.utilities
import normalize_to_list_of_substitutions
62 self.
__source_file__source_file = normalize_to_list_of_substitutions(source_file)
67 for key
in param_rewrites:
68 self.
__param_rewrites__param_rewrites[key] = normalize_to_list_of_substitutions(param_rewrites[key])
69 if key_rewrites
is not None:
70 for key
in key_rewrites:
71 self.
__key_rewrites__key_rewrites[key] = normalize_to_list_of_substitutions(key_rewrites[key])
72 if root_key
is not None:
73 self.
__root_key__root_key = normalize_to_list_of_substitutions(root_key)
76 def name(self) -> List[launch.Substitution]:
77 """Getter for name."""
81 """Return a description of this substitution as a string."""
84 def perform(self, context: launch.LaunchContext) -> Text:
85 yaml_filename = launch.utilities.perform_substitutions(context, self.
namename)
86 rewritten_yaml = tempfile.NamedTemporaryFile(mode=
'w', delete=
False)
87 param_rewrites, keys_rewrites = self.
resolve_rewritesresolve_rewrites(context)
88 data = yaml.safe_load(open(yaml_filename,
'r'))
92 root_key = launch.utilities.perform_substitutions(context, self.
__root_key__root_key)
94 data = {root_key: data}
95 yaml.dump(data, rewritten_yaml)
96 rewritten_yaml.close()
97 return rewritten_yaml.name
99 def resolve_rewrites(self, context):
102 resolved_params[key] = launch.utilities.perform_substitutions(context, self.
__param_rewrites__param_rewrites[key])
105 resolved_keys[key] = launch.utilities.perform_substitutions(context, self.
__key_rewrites__key_rewrites[key])
106 return resolved_params, resolved_keys
108 def substitute_params(self, yaml, param_rewrites):
111 if key.key()
in param_rewrites:
112 raw_value = param_rewrites[key.key()]
113 key.setValue(self.
convertconvert(raw_value))
116 yaml_paths = self.
pathifypathify(yaml)
117 for path
in yaml_paths:
118 if path
in param_rewrites:
120 rewrite_val = self.
convertconvert(param_rewrites[path])
121 yaml_keys = path.split(
'.')
125 def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val):
126 for key
in yaml_key_list:
127 if key == yaml_key_list[-1]:
128 yaml[key] = rewrite_val
130 key = yaml_key_list.pop(0)
131 if isinstance(yaml, list):
132 yaml[int(key)] = self.
updateYamlPathValsupdateYamlPathVals(yaml[int(key)], yaml_key_list, rewrite_val)
134 yaml[key] = self.
updateYamlPathValsupdateYamlPathVals(yaml.get(key, {}), yaml_key_list, rewrite_val)
137 def substitute_keys(self, yaml, key_rewrites):
138 if len(key_rewrites) != 0:
139 for key
in list(yaml.keys()):
141 if key
in key_rewrites:
142 new_key = key_rewrites[key]
143 yaml[new_key] = yaml[key]
145 if isinstance(val, dict):
148 def getYamlLeafKeys(self, yamlData):
150 for key
in yamlData.keys():
154 except AttributeError:
157 def pathify(self, d, p=None, paths=None, joinchar='.'):
160 self.
pathifypathify(d,
"", paths, joinchar=joinchar)
165 if isinstance(d, dict):
168 self.
pathifypathify(v, str(pn) + str(k), paths, joinchar=joinchar)
169 elif isinstance(d, list):
170 for idx, e
in enumerate(d):
171 self.
pathifypathify(e, pn + str(idx), paths, joinchar=joinchar)
175 def convert(self, text_value):
179 return float(text_value)
if '.' in text_value
else int(text_value)
184 if text_value.lower() ==
"true":
186 if text_value.lower() ==
"false":
def substitute_params(self, yaml, param_rewrites)
def substitute_keys(self, yaml, key_rewrites)
def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val)
def pathify(self, d, p=None, paths=None, joinchar='.')
def convert(self, text_value)
List[launch.Substitution] name(self)
def getYamlLeafKeys(self, yamlData)
def resolve_rewrites(self, context)