Nav2 Navigation Stack - humble  humble
ROS 2 Navigation Stack
rewritten_yaml.py
1 # Copyright (c) 2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from typing import Dict
16 from typing import List
17 from typing import Text
18 from typing import Optional
19 
20 import yaml
21 import tempfile
22 import launch
23 
24 
26  def __init__(self, dictionary, key):
27  self.dictionarydictionary = dictionary
28  self.dictKeydictKey = key
29 
30  def key(self):
31  return self.dictKeydictKey
32 
33  def setValue(self, value):
34  self.dictionarydictionary[self.dictKeydictKey] = value
35 
36 
37 class RewrittenYaml(launch.Substitution):
38  """
39  Substitution that modifies the given YAML file.
40 
41  Used in launch system
42  """
43 
44  def __init__(self,
45  source_file: launch.SomeSubstitutionsType,
46  param_rewrites: Dict,
47  root_key: Optional[launch.SomeSubstitutionsType] = None,
48  key_rewrites: Optional[Dict] = None,
49  convert_types = False) -> None:
50  super().__init__()
51  """
52  Construct the substitution
53 
54  :param: source_file the original YAML file to modify
55  :param: param_rewrites mappings to replace
56  :param: root_key if provided, the contents are placed under this key
57  :param: key_rewrites keys of mappings to replace
58  :param: convert_types whether to attempt converting the string to a number or boolean
59  """
60 
61  from launch.utilities import normalize_to_list_of_substitutions # import here to avoid loop
62  self.__source_file__source_file = normalize_to_list_of_substitutions(source_file)
63  self.__param_rewrites__param_rewrites = {}
64  self.__key_rewrites__key_rewrites = {}
65  self.__convert_types__convert_types = convert_types
66  self.__root_key__root_key = None
67  for key in param_rewrites:
68  self.__param_rewrites__param_rewrites[key] = normalize_to_list_of_substitutions(param_rewrites[key])
69  if key_rewrites is not None:
70  for key in key_rewrites:
71  self.__key_rewrites__key_rewrites[key] = normalize_to_list_of_substitutions(key_rewrites[key])
72  if root_key is not None:
73  self.__root_key__root_key = normalize_to_list_of_substitutions(root_key)
74 
75  @property
76  def name(self) -> List[launch.Substitution]:
77  """Getter for name."""
78  return self.__source_file__source_file
79 
80  def describe(self) -> Text:
81  """Return a description of this substitution as a string."""
82  return ''
83 
84  def perform(self, context: launch.LaunchContext) -> Text:
85  yaml_filename = launch.utilities.perform_substitutions(context, self.namename)
86  rewritten_yaml = tempfile.NamedTemporaryFile(mode='w', delete=False)
87  param_rewrites, keys_rewrites = self.resolve_rewritesresolve_rewrites(context)
88  data = yaml.safe_load(open(yaml_filename, 'r'))
89  self.substitute_paramssubstitute_params(data, param_rewrites)
90  self.substitute_keyssubstitute_keys(data, keys_rewrites)
91  if self.__root_key__root_key is not None:
92  root_key = launch.utilities.perform_substitutions(context, self.__root_key__root_key)
93  if root_key:
94  data = {root_key: data}
95  yaml.dump(data, rewritten_yaml)
96  rewritten_yaml.close()
97  return rewritten_yaml.name
98 
99  def resolve_rewrites(self, context):
100  resolved_params = {}
101  for key in self.__param_rewrites__param_rewrites:
102  resolved_params[key] = launch.utilities.perform_substitutions(context, self.__param_rewrites__param_rewrites[key])
103  resolved_keys = {}
104  for key in self.__key_rewrites__key_rewrites:
105  resolved_keys[key] = launch.utilities.perform_substitutions(context, self.__key_rewrites__key_rewrites[key])
106  return resolved_params, resolved_keys
107 
108  def substitute_params(self, yaml, param_rewrites):
109  # substitute leaf-only parameters
110  for key in self.getYamlLeafKeysgetYamlLeafKeys(yaml):
111  if key.key() in param_rewrites:
112  raw_value = param_rewrites[key.key()]
113  key.setValue(self.convertconvert(raw_value))
114 
115  # substitute total path parameters
116  yaml_paths = self.pathifypathify(yaml)
117  for path in yaml_paths:
118  if path in param_rewrites:
119  # this is an absolute path (ex. 'key.keyA.keyB.val')
120  rewrite_val = self.convertconvert(param_rewrites[path])
121  yaml_keys = path.split('.')
122  yaml = self.updateYamlPathValsupdateYamlPathVals(yaml, yaml_keys, rewrite_val)
123 
124 
125  def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val):
126  for key in yaml_key_list:
127  if key == yaml_key_list[-1]:
128  yaml[key] = rewrite_val
129  break
130  key = yaml_key_list.pop(0)
131  if isinstance(yaml, list):
132  yaml[int(key)] = self.updateYamlPathValsupdateYamlPathVals(yaml[int(key)], yaml_key_list, rewrite_val)
133  else:
134  yaml[key] = self.updateYamlPathValsupdateYamlPathVals(yaml.get(key, {}), yaml_key_list, rewrite_val)
135  return yaml
136 
137  def substitute_keys(self, yaml, key_rewrites):
138  if len(key_rewrites) != 0:
139  for key in list(yaml.keys()):
140  val = yaml[key]
141  if key in key_rewrites:
142  new_key = key_rewrites[key]
143  yaml[new_key] = yaml[key]
144  del yaml[key]
145  if isinstance(val, dict):
146  self.substitute_keyssubstitute_keys(val, key_rewrites)
147 
148  def getYamlLeafKeys(self, yamlData):
149  try:
150  for key in yamlData.keys():
151  for k in self.getYamlLeafKeysgetYamlLeafKeys(yamlData[key]):
152  yield k
153  yield DictItemReference(yamlData, key)
154  except AttributeError:
155  return
156 
157  def pathify(self, d, p=None, paths=None, joinchar='.'):
158  if p is None:
159  paths = {}
160  self.pathifypathify(d, "", paths, joinchar=joinchar)
161  return paths
162  pn = p
163  if p != "":
164  pn += joinchar
165  if isinstance(d, dict):
166  for k in d:
167  v = d[k]
168  self.pathifypathify(v, str(pn) + str(k), paths, joinchar=joinchar)
169  elif isinstance(d, list):
170  for idx, e in enumerate(d):
171  self.pathifypathify(e, pn + str(idx), paths, joinchar=joinchar)
172  else:
173  paths[p] = d
174 
175  def convert(self, text_value):
176  if self.__convert_types__convert_types:
177  # try converting to int or float
178  try:
179  return float(text_value) if '.' in text_value else int(text_value)
180  except ValueError:
181  pass
182 
183  # try converting to bool
184  if text_value.lower() == "true":
185  return True
186  if text_value.lower() == "false":
187  return False
188 
189  # nothing else worked so fall through and return text
190  return text_value
def substitute_params(self, yaml, param_rewrites)
def substitute_keys(self, yaml, key_rewrites)
def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val)
def pathify(self, d, p=None, paths=None, joinchar='.')
List[launch.Substitution] name(self)