Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
rewritten_yaml.py
1 # Copyright (c) 2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import tempfile
16 from typing import Dict, List, Optional, Text
17 
18 import launch
19 import yaml
20 
21 
23 
24  def __init__(self, dictionary, key):
25  self.dictionarydictionary = dictionary
26  self.dictKeydictKey = key
27 
28  def key(self):
29  return self.dictKeydictKey
30 
31  def setValue(self, value):
32  self.dictionarydictionary[self.dictKeydictKey] = value
33 
34 
35 class RewrittenYaml(launch.Substitution):
36  """
37  Substitution that modifies the given YAML file.
38 
39  Used in launch system
40  """
41 
42  def __init__(
43  self,
44  source_file: launch.SomeSubstitutionsType,
45  param_rewrites: Dict,
46  root_key: Optional[launch.SomeSubstitutionsType] = None,
47  key_rewrites: Optional[Dict] = None,
48  convert_types=False,
49  ) -> None:
50  super().__init__()
51  """
52  Construct the substitution
53 
54  :param: source_file the original YAML file to modify
55  :param: param_rewrites mappings to replace
56  :param: root_key if provided, the contents are placed under this key
57  :param: key_rewrites keys of mappings to replace
58  :param: convert_types whether to attempt converting the string to a number or boolean
59  """
60 
61  from launch.utilities import (
62  normalize_to_list_of_substitutions,
63  ) # import here to avoid loop
64 
65  self.__source_file__source_file = normalize_to_list_of_substitutions(source_file)
66  self.__param_rewrites__param_rewrites = {}
67  self.__key_rewrites__key_rewrites = {}
68  self.__convert_types__convert_types = convert_types
69  self.__root_key__root_key = None
70  for key in param_rewrites:
71  self.__param_rewrites__param_rewrites[key] = normalize_to_list_of_substitutions(
72  param_rewrites[key]
73  )
74  if key_rewrites is not None:
75  for key in key_rewrites:
76  self.__key_rewrites__key_rewrites[key] = normalize_to_list_of_substitutions(
77  key_rewrites[key]
78  )
79  if root_key is not None:
80  self.__root_key__root_key = normalize_to_list_of_substitutions(root_key)
81 
82  @property
83  def name(self) -> List[launch.Substitution]:
84  """Getter for name."""
85  return self.__source_file__source_file
86 
87  def describe(self) -> Text:
88  """Return a description of this substitution as a string."""
89  return ''
90 
91  def perform(self, context: launch.LaunchContext) -> Text:
92  yaml_filename = launch.utilities.perform_substitutions(context, self.namename)
93  rewritten_yaml = tempfile.NamedTemporaryFile(mode='w', delete=False)
94  param_rewrites, keys_rewrites = self.resolve_rewritesresolve_rewrites(context)
95  data = yaml.safe_load(open(yaml_filename, 'r'))
96  self.substitute_paramssubstitute_params(data, param_rewrites)
97  self.add_paramsadd_params(data, param_rewrites)
98  self.substitute_keyssubstitute_keys(data, keys_rewrites)
99  if self.__root_key__root_key is not None:
100  root_key = launch.utilities.perform_substitutions(context, self.__root_key__root_key)
101  if root_key:
102  data = {root_key: data}
103  yaml.dump(data, rewritten_yaml)
104  rewritten_yaml.close()
105  return rewritten_yaml.name
106 
107  def resolve_rewrites(self, context):
108  resolved_params = {}
109  for key in self.__param_rewrites__param_rewrites:
110  resolved_params[key] = launch.utilities.perform_substitutions(
111  context, self.__param_rewrites__param_rewrites[key]
112  )
113  resolved_keys = {}
114  for key in self.__key_rewrites__key_rewrites:
115  resolved_keys[key] = launch.utilities.perform_substitutions(
116  context, self.__key_rewrites__key_rewrites[key]
117  )
118  return resolved_params, resolved_keys
119 
120  def substitute_params(self, yaml, param_rewrites):
121  # substitute leaf-only parameters
122  for key in self.getYamlLeafKeysgetYamlLeafKeys(yaml):
123  if key.key() in param_rewrites:
124  raw_value = param_rewrites[key.key()]
125  key.setValue(self.convertconvert(raw_value))
126 
127  # substitute total path parameters
128  yaml_paths = self.pathifypathify(yaml)
129  for path in yaml_paths:
130  if path in param_rewrites:
131  # this is an absolute path (ex. 'key.keyA.keyB.val')
132  rewrite_val = self.convertconvert(param_rewrites[path])
133  yaml_keys = path.split('.')
134  yaml = self.updateYamlPathValsupdateYamlPathVals(yaml, yaml_keys, rewrite_val)
135 
136  def add_params(self, yaml, param_rewrites):
137  # add new total path parameters
138  yaml_paths = self.pathifypathify(yaml)
139  for path in param_rewrites:
140  if not path in yaml_paths: # noqa: E713
141  new_val = self.convertconvert(param_rewrites[path])
142  yaml_keys = path.split('.')
143  if 'ros__parameters' in yaml_keys:
144  yaml = self.updateYamlPathValsupdateYamlPathVals(yaml, yaml_keys, new_val)
145 
146  def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val):
147  for key in yaml_key_list:
148  if key == yaml_key_list[-1]:
149  yaml[key] = rewrite_val
150  break
151  key = yaml_key_list.pop(0)
152  if isinstance(yaml, list):
153  yaml[int(key)] = self.updateYamlPathValsupdateYamlPathVals(
154  yaml[int(key)], yaml_key_list, rewrite_val
155  )
156  else:
157  yaml[key] = self.updateYamlPathValsupdateYamlPathVals(
158  yaml.get(key, {}), yaml_key_list, rewrite_val
159  )
160  return yaml
161 
162  def substitute_keys(self, yaml, key_rewrites):
163  if len(key_rewrites) != 0:
164  for key in list(yaml.keys()):
165  val = yaml[key]
166  if key in key_rewrites:
167  new_key = key_rewrites[key]
168  yaml[new_key] = yaml[key]
169  del yaml[key]
170  if isinstance(val, dict):
171  self.substitute_keyssubstitute_keys(val, key_rewrites)
172 
173  def getYamlLeafKeys(self, yamlData):
174  try:
175  for key in yamlData.keys():
176  for k in self.getYamlLeafKeysgetYamlLeafKeys(yamlData[key]):
177  yield k
178  yield DictItemReference(yamlData, key)
179  except AttributeError:
180  return
181 
182  def pathify(self, d, p=None, paths=None, joinchar='.'):
183  if p is None:
184  paths = {}
185  self.pathifypathify(d, '', paths, joinchar=joinchar)
186  return paths
187  pn = p
188  if p != '':
189  pn += joinchar
190  if isinstance(d, dict):
191  for k in d:
192  v = d[k]
193  self.pathifypathify(v, str(pn) + str(k), paths, joinchar=joinchar)
194  elif isinstance(d, list):
195  for idx, e in enumerate(d):
196  self.pathifypathify(e, pn + str(idx), paths, joinchar=joinchar)
197  else:
198  paths[p] = d
199 
200  def convert(self, text_value):
201  if self.__convert_types__convert_types:
202  # try converting to int or float
203  try:
204  return float(text_value) if '.' in text_value else int(text_value)
205  except ValueError:
206  pass
207 
208  # try converting to bool
209  if text_value.lower() == 'true':
210  return True
211  if text_value.lower() == 'false':
212  return False
213 
214  # nothing else worked so fall through and return text
215  return text_value
def substitute_params(self, yaml, param_rewrites)
def add_params(self, yaml, param_rewrites)
def substitute_keys(self, yaml, key_rewrites)
def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val)
def pathify(self, d, p=None, paths=None, joinchar='.')
List[launch.Substitution] name(self)