843 lines
38 KiB
Python
843 lines
38 KiB
Python
# Copyright 2024 Volvo Car Corporation
|
|
# Licensed under Apache 2.0.
|
|
|
|
"""Module for handling ZoneController composition yaml generation."""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from ruamel.yaml import YAML, scalarstring
|
|
|
|
from powertrain_build.problem_logger import ProblemLogger
|
|
from powertrain_build.types import a2l_range
|
|
from powertrain_build.zone_controller.calibration import ZoneControllerCalibration as ZCC
|
|
|
|
|
|
class CompositionYaml(ProblemLogger):
|
|
"""Class for handling ZoneController composition yaml generation."""
|
|
|
|
def __init__(self, build_cfg, signal_if, unit_cfg, zc_core, zc_dids, zc_nvm, a2l_axis_data, enums):
|
|
"""Init.
|
|
|
|
Args:
|
|
build_cfg (BuildProjConfig): Object with build configuration settings.
|
|
signal_if (SignalInterfaces): Class holding signal interface information.
|
|
unit_cfg (UnitConfig): Object with unit configurations.
|
|
zc_core (ZCCore): Object with zone controller diagnositic event information.
|
|
zc_dids (ZCDIDs): Object with zone controller diagnostic DID information.
|
|
zc_nvm (ZCNVMDef): Object with NVM definition information.
|
|
a2l_axis_data (dict): Dict with characteristic axis data from A2L file.
|
|
enums (dict): Dict with enum data.
|
|
"""
|
|
self.tl_to_autosar_base_types = {
|
|
"Bool": "boolean",
|
|
"Float32": "float32",
|
|
"Int16": "sint16",
|
|
"Int32": "sint32",
|
|
"Int8": "sint8",
|
|
"UInt16": "uint16",
|
|
"UInt32": "uint32",
|
|
"UInt8": "uint8",
|
|
}
|
|
self.build_cfg = build_cfg
|
|
self.unit_src_dirs = build_cfg.get_unit_src_dirs()
|
|
self.composition_spec = signal_if.composition_spec
|
|
self.external_io = signal_if.get_external_io()
|
|
self.unit_cfg = unit_cfg
|
|
self.zc_core = zc_core
|
|
self.zc_dids = zc_dids
|
|
self.zc_nvm = zc_nvm
|
|
self.enums = enums
|
|
self.a2l_axis_data = a2l_axis_data
|
|
base_data_types = self.get_base_data_types() # Might not be necessary in the long run
|
|
self.data_types = {
|
|
**base_data_types,
|
|
**self.composition_spec.get("data_types", {}),
|
|
}
|
|
self.port_interfaces = self.composition_spec.get("port_interfaces", {})
|
|
calibration_variables, measurable_variables = self._get_variables()
|
|
self.calibration_init_values = self.get_init_values(calibration_variables)
|
|
self.cal_class_info = self._get_class_info(calibration_variables)
|
|
self.meas_class_info = self._get_class_info(measurable_variables)
|
|
self.include_calibration_interface_files = False
|
|
if self.build_cfg.get_code_generation_config(item="generateCalibrationInterfaceFiles"):
|
|
self.include_calibration_interface_files = True
|
|
if self.build_cfg.get_code_generation_config(item="useCalibrationRteMacroExpansion"):
|
|
self.include_calibration_interface_files = False
|
|
if self.include_calibration_interface_files:
|
|
trigger_read_rte_cdata_signal_name = self._get_calibration_trigger_signal_name(calibration_variables)
|
|
self.cal_class_info["tl"]["class_info"].update({
|
|
trigger_read_rte_cdata_signal_name: {
|
|
"type": ZCC.trigger_read_rte_cdata_signal['data_type'],
|
|
"width": 1,
|
|
}
|
|
})
|
|
self.cal_class_info["autosar"]["class_info"].update(
|
|
{
|
|
trigger_read_rte_cdata_signal_name: {
|
|
"type": ZCC.trigger_read_rte_cdata_signal["data_type"],
|
|
"access": "READ-WRITE",
|
|
"init": 0,
|
|
}
|
|
}
|
|
)
|
|
|
|
@staticmethod
|
|
def _cast_init_value(value_str):
|
|
"""Cast initialization value to correct type.
|
|
|
|
Args:
|
|
value_str (str): String representation of the value.
|
|
Returns:
|
|
(int/float): Value casted to correct type.
|
|
"""
|
|
if value_str.endswith("F"):
|
|
return float(value_str[:-1])
|
|
return int(value_str)
|
|
|
|
def _prepare_for_xml(self, signal_name, string):
|
|
"""Prepare a string for XML serialization.
|
|
|
|
Args:
|
|
signal_name (str): The name of the signal.
|
|
string (str): The string to prepare.
|
|
Returns:
|
|
xml_string (str): The prepared string.
|
|
"""
|
|
illegal_xml_characters = {
|
|
"&": "&", # needs to be first in list
|
|
"<": "<",
|
|
">": ">",
|
|
"'": "'",
|
|
'"': """
|
|
}
|
|
xml_string_tmp = "".join(string.splitlines())
|
|
for char, replacement in illegal_xml_characters.items():
|
|
xml_string_tmp = xml_string_tmp.replace(char, replacement)
|
|
if len(xml_string_tmp) > 255:
|
|
self.warning(f"Converted description for {signal_name} exceeds 255 characters and will be truncated.")
|
|
for replacement in illegal_xml_characters.values():
|
|
found = xml_string_tmp.find(replacement, 255-len(replacement), 255+len(replacement))
|
|
if found < 255 and found + len(replacement) > 255:
|
|
xml_string_tmp = xml_string_tmp[:found]
|
|
xml_string_tmp = xml_string_tmp[:255] # Since "found" is always < 255 this is safe
|
|
xml_string = scalarstring.DoubleQuotedScalarString(xml_string_tmp)
|
|
return xml_string
|
|
|
|
def get_base_data_types(self):
|
|
"""Create base data types in expected Autosar/yaml2arxml format."""
|
|
base_data_types = {
|
|
"Bool": {"type": "ENUMERATION", "enums": {"False": 0, "True": 1}},
|
|
"Float32": {
|
|
"type": "FLOAT",
|
|
"limits": {"lower": -3.4e38, "upper": 3.4e38},
|
|
},
|
|
}
|
|
int_data_types = [data_type for data_type in self.tl_to_autosar_base_types if "Int" in data_type]
|
|
for data_type in int_data_types:
|
|
lower, upper = a2l_range(data_type)
|
|
base_data_types[data_type] = {
|
|
"type": "INTEGER",
|
|
"limits": {"lower": lower, "upper": upper},
|
|
}
|
|
return base_data_types
|
|
|
|
def generate_yaml(self):
|
|
"""Generates a yaml from project/model information."""
|
|
composition_name = self.build_cfg.get_composition_config("compositionName")
|
|
composition_ending = self.build_cfg.get_composition_config("compositionEnding")
|
|
all_info = self.gather_yaml_info()
|
|
|
|
output_directory = self.build_cfg.get_src_code_dst_dir()
|
|
self.info(
|
|
"Writing Yaml into %s/%s.%s",
|
|
output_directory,
|
|
composition_name,
|
|
composition_ending,
|
|
)
|
|
Path(output_directory).mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(
|
|
f"{output_directory}/{composition_name}.{composition_ending}",
|
|
"w",
|
|
encoding="utf-8",
|
|
) as file:
|
|
yaml = YAML()
|
|
yaml.width = 1000 # We don't want to wrap lines in the yaml file
|
|
|
|
def modify_float_representation(s):
|
|
return re.sub(r"(\s-?\d+)e(?=\+|-\d)", r"\1.e", s)
|
|
|
|
yaml.dump(all_info, file, transform=modify_float_representation)
|
|
|
|
def gather_yaml_info(self):
|
|
"""Creates dict with relevant project/model information.
|
|
|
|
Returns:
|
|
all_info (dict): Dict to be written to yaml.
|
|
"""
|
|
software_components, pt_build_data_types = self._get_software_components()
|
|
|
|
all_info = {
|
|
"ExternalFiles": {
|
|
"Composition": self.build_cfg.get_composition_config("compositionArxml"),
|
|
"GenerateExternalImplementationTypes": self.build_cfg.get_composition_config(
|
|
"generateExternalImplementationType"
|
|
),
|
|
},
|
|
"SoftwareComponents": software_components,
|
|
"DataTypes": {**self.data_types, **pt_build_data_types},
|
|
"PortInterfaces": self.port_interfaces,
|
|
}
|
|
|
|
return all_info
|
|
|
|
def get_init_values(self, calibration_variables):
|
|
"""Get initialization values for calibration variables.
|
|
|
|
Args:
|
|
calibration_variables (dict): Dict of existing calibration variables.
|
|
Returns:
|
|
init_values (dict): Dictionary with initialization values for calibration variables.
|
|
"""
|
|
value_extraction_regexes = [
|
|
(
|
|
re.compile(r"^\s*CVC_CAL[A-Z_]*\s+\w+\s+(?P<name>\w+)\s*=\s*(?P<value>[-\d\.e]+F?)\s*;"),
|
|
lambda regex_match, _: self._cast_init_value(regex_match.group("value")),
|
|
),
|
|
(
|
|
re.compile(r"^\s*CVC_CAL[A-Z_]*\s+\w+\s+(?P<name>\w+)\[(?P<size>[\d]+)\]\s*=\s*"),
|
|
self._get_array_init_values,
|
|
),
|
|
(
|
|
re.compile(r"^\s*CVC_CAL[A-Z_]*\s+\w+\s+(?P<name>\w+)\[(?P<rows>[\d]+)\]\[(?P<cols>[\d]+)\]\s*=\s*"),
|
|
self._get_matrix_init_values,
|
|
),
|
|
(
|
|
re.compile(r"^\s*CVC_CAL[A-Z_]*\s+\w+\s+(?P<name>\w+)\s*=\s*(?P<enum>[a-zA-Z_$][\w_]*?)*;"),
|
|
lambda regex_match, _: regex_match.group("enum"),
|
|
),
|
|
]
|
|
|
|
init_values = {}
|
|
calibration_definitions = self._get_all_calibration_definitions()
|
|
calibration_definitions.reverse() # Reverse to pop from the end for performance
|
|
while calibration_definitions:
|
|
line = calibration_definitions.pop()
|
|
for regex, extraction_function in value_extraction_regexes:
|
|
regex_match = regex.match(line)
|
|
if regex_match is not None and regex_match.group("name") in calibration_variables:
|
|
if regex_match.group("name") in init_values:
|
|
self.critical("Variable definition for %s already found.", regex_match.group("name"))
|
|
init_values[regex_match.group("name")] = extraction_function(regex_match, calibration_definitions)
|
|
|
|
missing_init_values = set(calibration_variables) - set(init_values.keys())
|
|
if missing_init_values:
|
|
self.critical("Missing init values for calibration variables:\n%s", "\n".join(missing_init_values))
|
|
|
|
return init_values
|
|
|
|
def _get_all_calibration_definitions(self):
|
|
"""Get all calibration definitions from the source files.
|
|
|
|
Returns:
|
|
(iter): Iterator with calibration definitions.
|
|
"""
|
|
calibration_definitions = []
|
|
end_of_definitions_regex = re.compile(r"^void\s*RESTART_.*")
|
|
c_files = [Path(src_dir, unit.split("__")[0] + ".c").resolve() for unit, src_dir in self.unit_src_dirs.items()]
|
|
for c_file in c_files:
|
|
read_lines = ""
|
|
with c_file.open(mode="r", encoding="latin-1") as file_handle:
|
|
for line in file_handle:
|
|
if end_of_definitions_regex.match(line):
|
|
break
|
|
read_lines += line
|
|
calibration_definitions.extend(re.sub(r"/\*.*?\*/", "", read_lines, flags=re.S).splitlines())
|
|
return calibration_definitions
|
|
|
|
def _get_array_init_values(self, array_regex_match, definitions_list):
|
|
"""Get initialization values for an array.
|
|
|
|
NOTES:
|
|
Modifies the argument definitions_list by popping elements.
|
|
Popping from the end since list is reversed.
|
|
|
|
Args:
|
|
array_regex_match (re.Match): Match object with array definition.
|
|
definitions_list (list): List (reversed) with lines to parse.
|
|
Returns:
|
|
(list): List of initialization values for the array.
|
|
"""
|
|
array_init_values_str = ""
|
|
line = definitions_list.pop() # Skip array definition line
|
|
while "};" not in line:
|
|
array_init_values_str += line.strip()
|
|
line = definitions_list.pop()
|
|
array_init_values_str += line.strip()
|
|
array_init_values = re.findall(r"([-\d\.e]+F?),?", array_init_values_str)
|
|
|
|
if int(array_regex_match.group("size")) != len(array_init_values):
|
|
self.critical("Could not parse init values for array definition %s.", array_regex_match.group("name"))
|
|
|
|
return [self._cast_init_value(value) for value in array_init_values]
|
|
|
|
def _get_matrix_init_values(self, matrix_regex_match, definitions_list):
|
|
"""Get initialization values for a matrix.
|
|
|
|
NOTES:
|
|
Modifies the argument definitions_list by popping elements.
|
|
Popping from the end since list is reversed.
|
|
|
|
Args:
|
|
matrix_regex_match (re.Match): Match object with matrix definition.
|
|
definitions_list (list): List (reversed) with lines to parse.
|
|
Returns:
|
|
(list(list)): List of initialization values for the matrix.
|
|
"""
|
|
matrix_init_values = []
|
|
matrix_init_values_str = ""
|
|
line = definitions_list.pop() # Skip matrix definition line
|
|
while "};" not in line:
|
|
matrix_init_values_str += line.strip()
|
|
if "}" in line:
|
|
matrix_init_values.append(re.findall(r"([-\d\.e]+F?),?", matrix_init_values_str))
|
|
matrix_init_values_str = ""
|
|
line = definitions_list.pop()
|
|
|
|
row_check = int(matrix_regex_match.group("rows")) != len(matrix_init_values)
|
|
col_check = any(int(matrix_regex_match.group("cols")) != len(row) for row in matrix_init_values)
|
|
if row_check or col_check:
|
|
self.critical("Could not parse init values for matrix definition %s.", matrix_regex_match.group("name"))
|
|
|
|
return [[self._cast_init_value(value) for value in row] for row in matrix_init_values]
|
|
|
|
def _get_calibration_trigger_signal_name(self, calibration_variables):
|
|
"""Get the variable of the calibration trigger.
|
|
|
|
Make sure it is not present already.
|
|
|
|
Args:
|
|
calibration_variables (dict): Dict of existing calibration variables.
|
|
Returns:
|
|
trigger_signal (str): Name of variable for triggering calibration.
|
|
"""
|
|
software_component_name = self.build_cfg.get_composition_config("softwareComponentName")
|
|
trigger_signal = ZCC.trigger_read_rte_cdata_signal["name_template"].format(swc_name=software_component_name)
|
|
|
|
if trigger_signal in calibration_variables:
|
|
self.critical("Signal %s already defined in project.", trigger_signal)
|
|
|
|
return trigger_signal
|
|
|
|
def _edit_event_dict(self, event_dict):
|
|
"""Edit event dictionary to use double quoted strings on certain element values.
|
|
|
|
The elements that need to be double quoted are JumpDown, JumpUp and EnaDEMInd.
|
|
The values are either "on" or "off", which yaml loader still interprets as boolean.
|
|
Hence the need for double quotes.
|
|
For some reason, "true" is not interpreted as boolean though.
|
|
|
|
Args:
|
|
event_dict (dict): Dict with diagnostic event data.
|
|
Returns:
|
|
event_dict (dict): Dict with diagnostic event data,
|
|
updated with double quoted strings on certain element values."""
|
|
for event_data in event_dict.values():
|
|
for key, value in event_data.items():
|
|
if key in ["JumpDown", "JumpUp", "EnaDEMInd"]:
|
|
event_data[key] = scalarstring.DoubleQuotedScalarString(value)
|
|
return event_dict
|
|
|
|
def _get_diagnostic_info(self):
|
|
"""Get diagnostic information from composition spec.
|
|
|
|
NOTE: This function sets the valid_dids property of the ZCDIDs object.
|
|
|
|
Returns:
|
|
diag_dict (dict): Dict containing diagnostic information.
|
|
"""
|
|
diag_dict = {}
|
|
diagnostics = self.composition_spec.get("diagnostics", {})
|
|
|
|
if self.build_cfg.get_composition_config("includeDiagnostics") == "manual":
|
|
dids = diagnostics.get("dids", {})
|
|
events = self._edit_event_dict(diagnostics.get("events", {}))
|
|
elif self.build_cfg.get_composition_config("includeDiagnostics") == "manual_dids":
|
|
dids = diagnostics.get("dids", {})
|
|
events_tmp = self._edit_event_dict(diagnostics.get("events", {}))
|
|
events = self.zc_core.get_diagnostic_trouble_codes(events_tmp)
|
|
elif self.build_cfg.get_composition_config("includeDiagnostics") == "manual_dtcs":
|
|
self.zc_dids.valid_dids = diagnostics.get("dids", {})
|
|
dids = self.zc_dids.valid_dids
|
|
events = self._edit_event_dict(diagnostics.get("events", {}))
|
|
else:
|
|
self.zc_dids.valid_dids = diagnostics.get("dids", {})
|
|
dids = self.zc_dids.valid_dids
|
|
events_tmp = self._edit_event_dict(diagnostics.get("events", {}))
|
|
events = self.zc_core.get_diagnostic_trouble_codes(events_tmp)
|
|
rids = diagnostics.get("rids", {})
|
|
|
|
if dids:
|
|
diag_dict["dids"] = dids
|
|
if events:
|
|
diag_dict["events"] = events
|
|
if rids:
|
|
diag_dict["rids"] = rids
|
|
self.warning("Will not generate code for RIDs, add manually.")
|
|
return diag_dict
|
|
|
|
def _get_nvm_info(self):
|
|
"""Creates a dict with NVM information.
|
|
|
|
NVM dicts also needs to be added to the "static" field in the generated yaml.
|
|
|
|
NOTE: This function sets the valid_nvm_definitions property of the ZCNVMDef object.
|
|
|
|
Returns:
|
|
nvm_dict (dict): Dict containing NVM information.
|
|
data_types (dict): Dict containing data types for NVM information.
|
|
static_variables (dict): Dict containing "static" variables to add to the "static" field.
|
|
"""
|
|
data_types = {}
|
|
static_variables = {}
|
|
yaml_nvm_definitions = self.composition_spec.get("nv-needs", {})
|
|
self.zc_nvm.valid_nvm_definitions = yaml_nvm_definitions
|
|
nvm_dict = self.zc_nvm.valid_nvm_definitions
|
|
|
|
for nvm_name, nvm_data in nvm_dict.items():
|
|
init = []
|
|
nr_of_unused_signals = self.zc_nvm.project_nvm_definitions[nvm_name]["size"]
|
|
data_type_name = f"dt_{nvm_name}"
|
|
data_types[data_type_name] = {"type": "RECORD", "elements": {}}
|
|
for signal in self.zc_nvm.project_nvm_definitions[nvm_name]["signals"]:
|
|
element_name = f'{self.zc_nvm.struct_member_prefix}{signal["name"]}'
|
|
nr_of_unused_signals -= signal["x_size"] * signal["y_size"]
|
|
size = max(signal["x_size"], 1) * max(signal["y_size"], 1)
|
|
if size > 1:
|
|
x_data_type_name = f"dt_{signal['name']}_x"
|
|
y_data_type_name = f"dt_{signal['name']}_y"
|
|
if signal["x_size"] > 1 and signal["y_size"] == 1:
|
|
init.append([0] * signal["x_size"])
|
|
data_types[data_type_name]["elements"][element_name] = x_data_type_name
|
|
data_types[x_data_type_name] = {
|
|
"type": "ARRAY",
|
|
"size": signal["x_size"],
|
|
"element": signal["type"],
|
|
}
|
|
elif signal["x_size"] > 1 and signal["y_size"] > 1:
|
|
init.append([[0] * signal["y_size"]] * signal["x_size"])
|
|
data_types[data_type_name]["elements"][element_name] = y_data_type_name
|
|
data_types[y_data_type_name] = {
|
|
"type": "ARRAY",
|
|
"size": signal["y_size"],
|
|
"element": x_data_type_name,
|
|
}
|
|
data_types[x_data_type_name] = {
|
|
"type": "ARRAY",
|
|
"size": signal["x_size"],
|
|
"element": signal["type"],
|
|
}
|
|
else:
|
|
self.critical("NVM signal size incorrect. x_size should not be 1 if y_size > 1.")
|
|
else:
|
|
init.append(0)
|
|
data_types[data_type_name]["elements"][element_name] = signal["type"]
|
|
|
|
nvm_data.update({
|
|
"datatype": data_type_name,
|
|
"init": init
|
|
})
|
|
static_variables[nvm_name.lower()] = {
|
|
"type": data_type_name,
|
|
"access": "READ-ONLY",
|
|
"init": init,
|
|
}
|
|
|
|
if nr_of_unused_signals > 0:
|
|
# Mimics how we generate the unused member of the structs in nvm_def.py
|
|
nvm_data["init"].append([0] * nr_of_unused_signals)
|
|
data_types[data_type_name]["elements"]["unused"] = f"{data_type_name}_Unused"
|
|
data_types[f"{data_type_name}_Unused"] = {
|
|
"type": "ARRAY",
|
|
"size": nr_of_unused_signals,
|
|
"element": self.zc_nvm.project_nvm_definitions[nvm_name]["default_datatype"],
|
|
}
|
|
|
|
return nvm_dict, data_types, static_variables
|
|
|
|
def _get_ports_info(self):
|
|
"""Creates a dict containing port information.
|
|
|
|
Returns:
|
|
ports (dict): Dict containing port information.
|
|
"""
|
|
ports = self.composition_spec.get("ports", {})
|
|
for call, call_data in self.composition_spec.get("calls", {}).items():
|
|
if call in ports:
|
|
continue
|
|
ports[call] = {
|
|
"interface": call_data.get("interface", call),
|
|
"direction": call_data["direction"],
|
|
}
|
|
return ports
|
|
|
|
def _get_runnable_calls_info(self):
|
|
"""Creates a dict containing desired calls for the SWC.
|
|
|
|
Returns:
|
|
call_dict(dict): Dict containing runnable calls information.
|
|
"""
|
|
call_dict = {}
|
|
for call, call_data in self.composition_spec.get("calls", {}).items():
|
|
call_dict[call] = {"operation": call_data["operation"]}
|
|
if "timeout" in call_data:
|
|
call_dict[call]["timeout"] = call_data["timeout"]
|
|
return call_dict
|
|
|
|
def _get_runnable_info(self):
|
|
"""Creates a dict containing runnables information.
|
|
|
|
Returns:
|
|
dict: Dict containing runnables information.
|
|
"""
|
|
autosar_prefix = "AR_"
|
|
swc_prefix = self.build_cfg.get_scheduler_prefix()
|
|
custom_step_function = self.build_cfg.get_composition_config("customYamlStepFunctionName")
|
|
custom_init_function = self.build_cfg.get_composition_config("customYamlInitFunctionName")
|
|
standard_init_function = autosar_prefix + swc_prefix + "VcExtINI"
|
|
init_function = custom_init_function if custom_init_function is not None else standard_init_function
|
|
calibration_variables = list(
|
|
self.cal_class_info["autosar"]["class_info"].keys()
|
|
) + list(
|
|
self.composition_spec.get("shared", {}).keys()
|
|
)
|
|
swc_content = {init_function: {"type": "INIT", "accesses": calibration_variables}}
|
|
|
|
if self.include_calibration_interface_files:
|
|
swc_name = self.build_cfg.get_composition_config("softwareComponentName")
|
|
cal_init_function = autosar_prefix + ZCC.calibration_function_init_template.format(swc_name=swc_name)
|
|
cal_step_function = autosar_prefix + ZCC.calibration_function_step_template.format(swc_name=swc_name)
|
|
swc_content[cal_init_function] = {"type": "INIT", "accesses": calibration_variables}
|
|
swc_content[cal_step_function] = {"type": "PERIODIC", "period": 0.1, "accesses": calibration_variables}
|
|
|
|
call_dict = self._get_runnable_calls_info()
|
|
mode_switch_points_dict = self.composition_spec.get("mode_switch_points", {})
|
|
reads = []
|
|
writes = []
|
|
for port, port_data in self._get_ports_info().items():
|
|
if port_data["direction"] in ["IN", "CLIENT"]:
|
|
reads.append(port)
|
|
else:
|
|
writes.append(port)
|
|
runnables = self.build_cfg.get_units_raster_cfg()["SampleTimes"]
|
|
|
|
if len(runnables) == 1 and custom_step_function is not None:
|
|
swc_content[custom_step_function] = {
|
|
"type": "PERIODIC",
|
|
"period": list(runnables.values())[0],
|
|
"accesses": calibration_variables,
|
|
}
|
|
if call_dict:
|
|
swc_content[custom_step_function]["calls"] = call_dict
|
|
if mode_switch_points_dict:
|
|
swc_content[custom_step_function]["mode_switch_points"] = mode_switch_points_dict
|
|
if reads:
|
|
swc_content[custom_step_function]["reads"] = reads
|
|
if writes:
|
|
swc_content[custom_step_function]["writes"] = writes
|
|
return swc_content
|
|
|
|
if custom_step_function is not None:
|
|
self.warning(
|
|
"Custom step function specified, but multiple runnables defined. Ignoring custom step function."
|
|
)
|
|
|
|
for runnable, period in runnables.items():
|
|
key = autosar_prefix + swc_prefix + runnable
|
|
swc_content[key] = {
|
|
"period": period,
|
|
"type": "PERIODIC",
|
|
"accesses": calibration_variables,
|
|
}
|
|
if call_dict:
|
|
swc_content[key]["calls"] = call_dict
|
|
if mode_switch_points_dict:
|
|
swc_content[key]["mode_switch_points"] = mode_switch_points_dict
|
|
if reads:
|
|
swc_content[key]["reads"] = reads
|
|
if writes:
|
|
swc_content[key]["writes"] = writes
|
|
|
|
return swc_content
|
|
|
|
def _get_software_components(self):
|
|
"""Creates a dict with swc information and referred data types.
|
|
|
|
Returns:
|
|
swcs (dict): SWC information.
|
|
data_types (dict): Data types information.
|
|
"""
|
|
software_component_name = self.build_cfg.get_composition_config("softwareComponentName")
|
|
software_component_template = self.build_cfg.get_composition_config("softwareComponentTemplate")
|
|
swcs = {software_component_name: {}}
|
|
swcs[software_component_name]["type"] = "SWC" # Other types than swc??
|
|
if software_component_template is not None:
|
|
swcs[software_component_name]["template"] = software_component_template
|
|
swcs[software_component_name]["asil"] = self.build_cfg.get_composition_config("asil")
|
|
swcs[software_component_name]["secure"] = self.build_cfg.get_composition_config("secure")
|
|
swcs[software_component_name]["runnables"] = self._get_runnable_info()
|
|
if self.build_cfg.get_composition_config("includeShared") is True:
|
|
swcs[software_component_name]["shared"] = self.cal_class_info["autosar"]["class_info"]
|
|
for variable_name, variable_info in self.composition_spec.get("shared", {}).items():
|
|
if variable_name in swcs[software_component_name]["shared"]:
|
|
self.critical("Shared variable %s already defined in project.", variable_name)
|
|
else:
|
|
swcs[software_component_name]["shared"][variable_name] = variable_info
|
|
elif self.build_cfg.get_composition_config("includeShared") == "manual":
|
|
swcs[software_component_name]["shared"] = {}
|
|
for variable_name, variable_info in self.composition_spec.get("shared", {}).items():
|
|
swcs[software_component_name]["shared"][variable_name] = variable_info
|
|
if self.build_cfg.get_composition_config("includeStatic") is True:
|
|
swcs[software_component_name]["static"] = self.meas_class_info["autosar"]["class_info"]
|
|
for variable_name, variable_info in self.composition_spec.get("static", {}).items():
|
|
if variable_name in swcs[software_component_name]["static"]:
|
|
self.critical("Static variable %s already defined in project.", variable_name)
|
|
else:
|
|
swcs[software_component_name]["static"][variable_name] = variable_info
|
|
elif self.build_cfg.get_composition_config("includeStatic") == "manual":
|
|
swcs[software_component_name]["static"] = {}
|
|
for variable_name, variable_info in self.composition_spec.get("static", {}).items():
|
|
swcs[software_component_name]["static"][variable_name] = variable_info
|
|
swcs[software_component_name]["ports"] = self._get_ports_info()
|
|
if self.composition_spec.get("io") is not None:
|
|
swcs[software_component_name]["io"] = self.composition_spec["io"]
|
|
diagnostic_info = self._get_diagnostic_info()
|
|
if self.build_cfg.get_composition_config("includeDiagnostics") is not False:
|
|
swcs[software_component_name]["diagnostics"] = diagnostic_info
|
|
nvm_info, nvm_data_types_tmp, static_variables = self._get_nvm_info()
|
|
if self.build_cfg.get_composition_config("includeNvm"):
|
|
swcs[software_component_name]["nv-needs"] = nvm_info
|
|
nvm_data_types = nvm_data_types_tmp
|
|
if self.build_cfg.get_composition_config("includeStatic") is True:
|
|
swcs[software_component_name]["static"].update(static_variables)
|
|
else:
|
|
nvm_data_types = {}
|
|
|
|
data_types = {
|
|
**self.cal_class_info["autosar"]["data_types"],
|
|
**self.meas_class_info["autosar"]["data_types"],
|
|
**nvm_data_types,
|
|
}
|
|
|
|
return swcs, data_types
|
|
|
|
def _get_variables(self):
|
|
"""Get calibration and measurable variables from the unit configuration.
|
|
|
|
Returns:
|
|
calibration_variables (dict): Dict with calibration variables.
|
|
measurable_variables (dict): Dict with measurable variables.
|
|
"""
|
|
calibration_variables = {}
|
|
measurable_variables = {}
|
|
config = self.unit_cfg.get_per_cfg_unit_cfg()
|
|
valid_configs = ["outports", "local_vars", "calib_consts"]
|
|
for valid_config in valid_configs:
|
|
for signal_name, unit_info in config.get(valid_config, {}).items():
|
|
if len(unit_info) > 1:
|
|
self.critical("Multiple definitions for %s in config json files.", signal_name)
|
|
for info in unit_info.values():
|
|
if "CVC_CAL" in info["class"]:
|
|
calibration_variables[signal_name] = info
|
|
elif "CVC_DISP" in info["class"]:
|
|
measurable_variables[signal_name] = info
|
|
# External inports should also be considered as measurable variables
|
|
for io_type in self.external_io:
|
|
for signal_name in io_type.get("input", {}).keys():
|
|
for signal_data in config["inports"][signal_name].values():
|
|
measurable_variables[signal_name] = signal_data
|
|
continue # Inports can appear in several units, pick first one
|
|
return calibration_variables, measurable_variables
|
|
|
|
def _get_class_info(self, variable_dict):
|
|
"""Creates a dict with parameter information and referred data types.
|
|
|
|
Args:
|
|
variable_dict (dict): Dictionary with variables and data.
|
|
Returns:
|
|
(dict): Dictionary with variables and data types (Autosar and TL).
|
|
"""
|
|
autosar_class_info = {}
|
|
autosar_data_types = {}
|
|
tl_class_info = {}
|
|
for signal_name, info in variable_dict.items():
|
|
(
|
|
autosar_class_info,
|
|
autosar_data_types,
|
|
) = self._add_autosar_data_types(autosar_class_info, autosar_data_types, signal_name, info)
|
|
if signal_name in autosar_class_info:
|
|
tl_class_info[signal_name] = {
|
|
"type": info["type"],
|
|
"autosar_type": autosar_class_info[signal_name]["type"].split("/")[-1],
|
|
"width": info["width"],
|
|
}
|
|
return {
|
|
"autosar": {
|
|
"class_info": autosar_class_info,
|
|
"data_types": autosar_data_types,
|
|
},
|
|
"tl": {"class_info": tl_class_info, "data_types": {}},
|
|
}
|
|
|
|
def _add_autosar_data_types(self, class_info, data_types, signal_name, info):
|
|
"""Process a variable for inclusion in composition, adding it's data type to
|
|
data_types and the variable to class_info.
|
|
|
|
Args:
|
|
class_info (dict): Dictionary with variables.
|
|
data_types (dict): Dictionary with data types.
|
|
signal_name (string): Name of signal to process.
|
|
info (dict): signal data.
|
|
Returns:
|
|
|
|
class_info (dict): Updated dictionary with variables.
|
|
data_types (dict): Updated dictionary with data types.
|
|
"""
|
|
if info["type"] in self.enums.keys():
|
|
return class_info, data_types
|
|
|
|
includeSharedSwAddrMethod = self.build_cfg.get_composition_config("includeSharedSwAddrMethod")
|
|
isDisp = "CVC_DISP" in info["class"]
|
|
if "Bool" in info["type"]:
|
|
upper = 1
|
|
lower = 0
|
|
else:
|
|
base_type_lower = self.data_types[info["type"]]["limits"]["lower"]
|
|
base_type_upper = self.data_types[info["type"]]["limits"]["upper"]
|
|
lower = info["min"] if info["min"] != "-" else base_type_lower
|
|
upper = info["max"] if info["max"] != "-" else base_type_upper
|
|
|
|
if not isinstance(info["width"], list):
|
|
class_info[signal_name] = {
|
|
"type": info["type"],
|
|
"access": "READ-ONLY" if isDisp or info["class"] == "CVC_EXT" else "READ-WRITE",
|
|
"init": self.calibration_init_values.get(signal_name, max(min(0, upper), lower)),
|
|
}
|
|
if info["description"]:
|
|
class_info[signal_name]["longname"] = self._prepare_for_xml(signal_name, info["description"])
|
|
if includeSharedSwAddrMethod and not isDisp:
|
|
class_info[signal_name]["swAddrMethod"] = f"VCC_CALIBRATION_{signal_name.split('_')[0]}"
|
|
return class_info, data_types
|
|
|
|
if isinstance(lower, list) or isinstance(upper, list):
|
|
if info["width"][0] > 1:
|
|
self.critical(
|
|
"%s is a multidimentional array of elements with different constraints, not supported.", signal_name
|
|
)
|
|
init = []
|
|
for idx in range(info["width"][1]):
|
|
lower_val = lower[idx] if isinstance(lower, list) else lower
|
|
lower_val = lower_val if lower_val != "-" else base_type_lower
|
|
upper_val = upper[idx] if isinstance(upper, list) else upper
|
|
upper_val = upper_val if upper_val != "-" else base_type_upper
|
|
init.append(max(min(0, upper_val), lower_val))
|
|
else:
|
|
init = max(min(0, upper), lower)
|
|
if info["width"][0] > 1:
|
|
init = [[init] * info["width"][1] for _ in range(info["width"][0])]
|
|
else:
|
|
init = [init] * info["width"][1]
|
|
|
|
init = self.calibration_init_values.get(signal_name, init)
|
|
|
|
new_data_type = {}
|
|
new_data_type_name = f"dt_{signal_name}"
|
|
if signal_name.startswith("t"):
|
|
if signal_name.endswith("_x"):
|
|
new_data_type_data = {
|
|
"type": "COM_AXIS",
|
|
"axis-index": 1,
|
|
"size": info["width"][1],
|
|
"limits": {"lower": lower, "upper": upper},
|
|
"swrecordlayout": {
|
|
"name": f"Distr_{signal_name}",
|
|
"type": "INDEX_INCR",
|
|
"basetype": self.tl_to_autosar_base_types[info["type"]],
|
|
"label": "X",
|
|
},
|
|
}
|
|
else:
|
|
axis = self.a2l_axis_data.get(signal_name, {}).get("axes", [signal_name + "_x"])[0]
|
|
new_data_type_data = {
|
|
"type": "CURVE",
|
|
"axis": f"dt_{axis}",
|
|
"limits": {"lower": lower, "upper": upper},
|
|
"swrecordlayout": {
|
|
"name": f"Curve_{signal_name}",
|
|
"type": "COLUMN_DIR",
|
|
"basetype": self.tl_to_autosar_base_types[info["type"]],
|
|
"label": "Val",
|
|
},
|
|
}
|
|
if self.build_cfg.get_composition_config("scaleMapsAndCurves") and "int" in info["type"].lower():
|
|
new_data_type_data["slope"] = info["lsb"]
|
|
new_data_type_data["bias"] = info["offset"]
|
|
elif signal_name.startswith("m"):
|
|
new_data_type_data = {
|
|
"type": "COM_AXIS",
|
|
"size": info["width"][1],
|
|
"limits": {"lower": lower, "upper": upper},
|
|
"swrecordlayout": {
|
|
"name": f"Distr_{signal_name}",
|
|
"type": "INDEX_INCR",
|
|
"basetype": self.tl_to_autosar_base_types[info["type"]],
|
|
},
|
|
}
|
|
if signal_name.endswith("_r"):
|
|
new_data_type_data["axis-index"] = 1
|
|
new_data_type_data["swrecordlayout"]["label"] = "X"
|
|
elif signal_name.endswith("_c"):
|
|
new_data_type_data["axis-index"] = 2
|
|
new_data_type_data["swrecordlayout"]["label"] = "Y"
|
|
else:
|
|
default_names = [signal_name + "_r", signal_name + "_c"]
|
|
axis_r, axis_c = self.a2l_axis_data.get(signal_name, {}).get("axes", default_names)
|
|
new_data_type_data = {
|
|
"type": "MAP",
|
|
"x-axis": f"dt_{axis_r}",
|
|
"y-axis": f"dt_{axis_c}",
|
|
"limits": {"lower": lower, "upper": upper},
|
|
"swrecordlayout": {
|
|
"name": f"Map_{signal_name}",
|
|
"type": "COLUMN_DIR",
|
|
"basetype": self.tl_to_autosar_base_types[info["type"]],
|
|
"label": "Val",
|
|
},
|
|
}
|
|
if self.build_cfg.get_composition_config("scaleMapsAndCurves") and "int" in info["type"].lower():
|
|
new_data_type_data["slope"] = info["lsb"]
|
|
new_data_type_data["bias"] = info["offset"]
|
|
elif info["width"][0] == 1:
|
|
new_data_type_name = f"dt_{signal_name}_{info['width'][1]}"
|
|
new_data_type_data = {
|
|
"type": "ARRAY",
|
|
"size": info["width"][1],
|
|
"element": info["type"],
|
|
}
|
|
else:
|
|
self.critical("Signal config error for %s.", signal_name)
|
|
return class_info, data_types
|
|
|
|
new_data_type[new_data_type_name] = new_data_type_data
|
|
class_info[signal_name] = {
|
|
"type": new_data_type_name,
|
|
"access": "READ-ONLY" if isDisp else "READ-WRITE",
|
|
"init": init,
|
|
}
|
|
if info["description"]:
|
|
class_info[signal_name]["longname"] = self._prepare_for_xml(signal_name, info["description"])
|
|
if includeSharedSwAddrMethod and not isDisp:
|
|
class_info[signal_name]["swAddrMethod"] = f"VCC_CALIBRATION_{signal_name.split('_')[0]}"
|
|
data_types = {**data_types, **new_data_type}
|
|
return class_info, data_types
|