import enum
import xml.etree.ElementTree as ET
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import ingenialogger
from ingenialink.exceptions import ILDictionaryParseError
from ingenialink.ethernet.register import EthernetRegister
from ingenialink.canopen.register import CanopenRegister
from ingenialink.register import REG_ACCESS, REG_ADDRESS_TYPE, REG_DTYPE, Register, RegCyclicType
from ingenialink.ethercat.register import EthercatRegister
logger = ingenialogger.get_logger(__name__)
# Dictionary constants guide:
# Each constant has this structure: DICT_ORIGIN_END
# ORIGIN: The start point of the path
# END: The end point of the path
# ORIGIN: LABELS
DICT_LABELS = "./Labels"
DICT_LABELS_LABEL = f"{DICT_LABELS}/Label"
[docs]class Interface(enum.Enum):
"""Connection Interfaces"""
CAN = enum.auto()
"""CANopen"""
ETH = enum.auto()
"""Ethernet"""
ECAT = enum.auto()
"""EtherCAT"""
EoE = enum.auto()
"""Ethernet over EtherCAT"""
VIRTUAL = enum.auto()
"""Virtual Drive"""
[docs]class SubnodeType(enum.Enum):
"""Subnode types"""
COMMUNICATION = enum.auto()
"""Communication"""
MOTION = enum.auto()
"""Motion"""
SAFETY = enum.auto()
"""Safety"""
[docs]@dataclass
class DictionarySafetyPDO:
"""Safety PDOs dictionary descriptor"""
[docs] @dataclass
class PDORegister:
"""PDO register descriptor"""
register: Optional[CanopenRegister]
size: int
index: int
entries: List[PDORegister]
[docs]class DictionaryCategories:
"""Contains all categories from a Dictionary.
Args:
list_xdf_categories: List of Elements from xdf file
"""
def __init__(self, list_xdf_categories: List[ET.Element]) -> None:
self._list_xdf_categories = list_xdf_categories
self._cat_ids: List[str] = []
self._categories: Dict[str, Dict[str, str]] = {}
self.load_cat_ids()
[docs] def load_cat_ids(self) -> None:
"""Load category IDs from dictionary."""
for element in self._list_xdf_categories:
self._cat_ids.append(element.attrib["id"])
cat_element = element.find(DICT_LABELS_LABEL)
if cat_element is None:
logger.warning(
f"The element of the category {element.attrib['id']} could not be load"
)
continue
cat_id = cat_element.text
if cat_id is None:
logger.warning(f"The ID of the category {element.attrib['id']} could not be load")
continue
self._categories[element.attrib["id"]] = {"en_US": cat_id}
@property
def category_ids(self) -> List[str]:
"""Category IDs."""
return self._cat_ids
[docs] def labels(self, cat_id: str) -> Dict[str, str]:
"""Obtain labels for a certain category ID.
Args:
cat_id: Category ID
Returns:
Labels dictionary.
"""
return self._categories[cat_id]
[docs]class DictionaryErrors:
"""Errors for the dictionary.
Args:
list_xdf_errors: List of Elements from xdf file
"""
def __init__(self, list_xdf_errors: List[ET.Element]) -> None:
self._list_xdf_errors = list_xdf_errors
self._errors: Dict[int, List[Optional[str]]] = {}
self.load_errors()
[docs] def load_errors(self) -> None:
"""Load errors from dictionary."""
for element in self._list_xdf_errors:
label = element.find(DICT_LABELS_LABEL)
if label is None:
logger.warning(f"Could not load label of error {element.attrib['id']}")
continue
self._errors[int(element.attrib["id"], 16)] = [
element.attrib["id"],
element.attrib["affected_module"],
element.attrib["error_type"].capitalize(),
label.text,
]
@property
def errors(self) -> Dict[int, List[Optional[str]]]:
"""Get the errors dictionary.
Returns:
Errors dictionary.
"""
return self._errors
[docs]class Dictionary(ABC):
"""Ingenia dictionary Abstract Base Class.
Args:
dictionary_path: Dictionary file path.
interface: communication interface.
Raises:
ILDictionaryParseError: If the dictionary could not be created.
"""
dtype_xdf_options = {
"float": REG_DTYPE.FLOAT,
"s8": REG_DTYPE.S8,
"u8": REG_DTYPE.U8,
"s16": REG_DTYPE.S16,
"u16": REG_DTYPE.U16,
"s32": REG_DTYPE.S32,
"u32": REG_DTYPE.U32,
"s64": REG_DTYPE.S64,
"u64": REG_DTYPE.U64,
"str": REG_DTYPE.STR,
"bool": REG_DTYPE.BOOL,
}
access_xdf_options = {"r": REG_ACCESS.RO, "w": REG_ACCESS.WO, "rw": REG_ACCESS.RW}
address_type_xdf_options = {
"NVM": REG_ADDRESS_TYPE.NVM,
"NVM_NONE": REG_ADDRESS_TYPE.NVM_NONE,
"NVM_CFG": REG_ADDRESS_TYPE.NVM_CFG,
"NVM_LOCK": REG_ADDRESS_TYPE.NVM_LOCK,
"NVM_HW": REG_ADDRESS_TYPE.NVM_HW,
}
subnode_xdf_options = {
"Communication": SubnodeType.COMMUNICATION,
"Motion": SubnodeType.MOTION,
"Safety": SubnodeType.SAFETY,
}
version: str
"""Version of the dictionary."""
firmware_version: Optional[str]
"""Firmware version declared in the dictionary."""
product_code: int
"""Product code declared in the dictionary."""
part_number: Optional[str]
"""Part number declared in the dictionary."""
revision_number: int
"""Revision number declared in the dictionary."""
interface: Interface
"""Interface declared in the dictionary."""
subnodes: Dict[int, SubnodeType]
"""Number of subnodes in the dictionary."""
categories: DictionaryCategories
"""Instance of all the categories in the dictionary."""
errors: DictionaryErrors
"""Instance of all the errors in the dictionary."""
image: Optional[str] = None
"""Drive's encoded image."""
is_safe: bool = False
"""True if has SafetyPDOs element, else False"""
_registers: Dict[int, Dict[str, Register]]
"""Instance of all the registers in the dictionary"""
registers_group: Dict[int, Dict[str, List[Register]]]
"""Registers group by subnode and UID"""
safety_rpdos: Dict[str, DictionarySafetyPDO]
"""Safety RPDOs by UID"""
safety_tpdos: Dict[str, DictionarySafetyPDO]
"""Safety TPDOs by UID"""
def __init__(self, dictionary_path: str, interface: Interface) -> None:
self.registers_group = {}
self.safety_rpdos = {}
self.safety_tpdos = {}
self._registers = {}
self.subnodes = {}
self.path = dictionary_path
"""Path of the dictionary."""
self.interface = interface
try:
self.read_dictionary()
except KeyError as e:
raise ILDictionaryParseError("The dictionary is not well-formed.") from e
def __add__(self, other_dict: "Dictionary") -> "Dictionary":
"""Merge two dictionary instances.
It can only be used for merging COM-KIT and CORE dictionaries.
"""
if not isinstance(other_dict, type(self)):
raise TypeError(
f"Cannot merge dictionaries. Expected type: {type(self)}, got: {type(other_dict)}"
)
if not other_dict.is_coco_dictionary and not self.is_coco_dictionary:
raise ValueError(
"Cannot merge dictionaries. One of the dictionaries must be a COM-KIT dictionary."
)
self._merge_registers(other_dict)
self._merge_errors(other_dict)
self._merge_attributes(other_dict)
self._set_image(other_dict)
return self
[docs] def registers(self, subnode: int) -> Dict[str, Register]:
"""Gets the register dictionary to the targeted subnode.
Args:
subnode: Identifier for the subnode.
Returns:
Dictionary of all the registers for a subnode.
"""
return self._registers[subnode]
[docs] @abstractmethod
def read_dictionary(self) -> None:
"""Reads the dictionary file and initializes all its components."""
pass
[docs] def child_registers(self, uid: str, subnode: int) -> List[Register]:
"""Return group registers by an UID
Args:
uid: registers group UID
subnode: registers group subnode
Returns:
All registers in the group
Raises:
KeyError: Registers group does not exist
"""
if subnode in self.registers_group and uid in self.registers_group[subnode]:
return self.registers_group[subnode][uid]
raise KeyError(f"Registers group {uid} in subnode {subnode} not exist")
[docs] def get_safety_rpdo(self, uid: str) -> DictionarySafetyPDO:
"""Get Safe RPDO by uid
Args:
uid: Safe RPDO uid
Returns:
PDO object description
Raises:
NotImplementedError: Device is not safe
KeyError: Safe RPDO not exist
"""
if not self.is_safe:
raise NotImplementedError("Safe PDOs are not implemented for this device")
if uid in self.safety_rpdos:
return self.safety_rpdos[uid]
raise KeyError(f"Safe RPDO {uid} not exist")
[docs] def get_safety_tpdo(self, uid: str) -> DictionarySafetyPDO:
"""Get Safe TPDO by uid
Args:
uid: Safe TPDO uid
Returns:
PDO object description
Raises:
NotImplementedError: Device is not safe
KeyError: Safe TPDO not exist
"""
if not self.is_safe:
raise NotImplementedError("Safe PDOs are not implemented for this device")
if uid in self.safety_tpdos:
return self.safety_tpdos[uid]
raise KeyError(f"Safe TPDO {uid} not exist")
def _merge_registers(self, other_dict: "Dictionary") -> None:
"""Add the registers from another dictionary to the dictionary instance.
Args:
other_dict: The other dictionary instance.
"""
for subnode, registers in other_dict._registers.items():
self._registers[subnode].update(registers)
def _merge_errors(self, other_dict: "Dictionary") -> None:
"""Add the errors from another dictionary to the dictionary instance.
Args:
other_dict: The other dictionary instance.
"""
self.errors.errors.update(other_dict.errors.errors)
def _set_image(self, other_dict: "Dictionary") -> None:
"""Set the image attribute.
Choose the image from the dictionary that has one.
Args:
other_dict: The other dictionary instance.
"""
core_dict = self if other_dict.is_coco_dictionary else other_dict
self.image = core_dict.image
def _merge_attributes(self, other_dict: "Dictionary") -> None:
"""Add the revision number, product code, firmware version and part number
from the other dictionary to the dictionary instance.
Args:
other_dict: The other dictionary instance.
"""
if not other_dict.is_coco_dictionary:
self.product_code = other_dict.product_code
self.revision_number = other_dict.revision_number
self.firmware_version = other_dict.firmware_version
self.part_number = other_dict.part_number
@property
def is_coco_dictionary(self) -> bool:
"""Check if dictionary is a CoCo dictionary
Returns:
True if the dictionary is a CoCo dictionary. False otherwise.
"""
return len(self.registers(1)) == 0
[docs]class DictionaryV3(Dictionary):
DRIVE_IMAGE_ELEMENT = "DriveImage"
HEADER_ELEMENT = "Header"
VERSION_ELEMENT = "Version"
BODY_ELEMENT = "Body"
CATEGORIES_ELEMENT = "Categories"
CATEGORY_ELEMENT = "Category"
DEVICES_ELEMENT = "Devices"
DEVICE_ELEMENT = {
Interface.CAN: "CANDevice",
Interface.ETH: "ETHDevice",
Interface.ECAT: "ECATDevice",
Interface.EoE: "EoEDevice",
}
DEVICE_FW_VERSION_ATTR = "firmwareVersion"
DEVICE_PRODUCT_CODE_ATTR = "ProductCode"
DEVICE_PART_NUMBER_ATTR = "PartNumber"
DEVICE_REVISION_NUMBER_ATTR = "RevisionNumber"
SUBNODES_ELEMENT = "Subnodes"
SUBNODE_ELEMENT = "Subnode"
SUBNODE_INDEX_ATTR = "index"
SUBNODE_ATTR = "subnode"
ADDRESS_TYPE_ATTR = "address_type"
ACCESS_ATTR = "access"
DTYPE_ATTR = "dtype"
UID_ATTR = "id"
CYCLIC_ATTR = "cyclic"
DESCRIPTION_ATTR = "desc"
DEFAULT_ATTR = "default"
CAT_ID_ATTR = "cat_id"
UNITS_ATTR = "units"
CANOPEN_OBJECTS_ELEMENT = "CANopenObjects"
CANOPEN_OBJECT_ELEMENT = "CANopenObject"
SUBITEMS_ELEMENT = "Subitems"
SUBITEM_ELEMENT = "Subitem"
INDEX_ATTR = "index"
SUBINDEX_ATTR = "subindex"
MCB_REGISTERS_ELEMENT = "MCBRegisters"
MCB_REGISTER_ELEMENT = "MCBRegister"
ADDRESS_ATTR = "address"
ERRORS_ELEMENT = "Errors"
ERROR_ELEMENT = "Error"
LABELS_ELEMENT = "Labels"
LABEL_ELEMENT = "Label"
LABEL_LANG_ATTR = "lang"
ENUMERATIONS_ELEMENT = "Enumerations"
ENUM_ELEMENT = "Enum"
ENUM_VALUE_ATTR = "value"
RANGE_ELEMENT = "Range"
RANGE_MIN_ATTR = "min"
RANGE_MAX_ATTR = "max"
SAFETY_PDOS_ELEMENT = "SafetyPDOs"
RPDO_ELEMENT = "RPDO"
TPDO_ELEMENT = "TPDO"
PDO_UID_ATTR = "id"
PDO_INDEX_ATTR = "index"
PDO_ENTRY_ELEMENT = "PDOEntry"
PDO_ENTRY_SIZE_ATTR = "size"
PDO_ENTRY_SUBNODE_ATTR = "subnode"
@staticmethod
def __find_and_check(root: ET.Element, path: str) -> ET.Element:
"""Return the path element in the target root element if exists, else, raises an exception.
Args:
root: root element
path: target element path
Returns:
path element
Raises:
ILDictionaryParseError: path element not found
"""
element = root.find(path)
if element is None:
raise ILDictionaryParseError(f"{path} element is not found")
return element
@staticmethod
def __findall_and_check(root: ET.Element, path: str) -> List[ET.Element]:
"""Return list of elements in the target root element if exist, else, raises an exception.
Args:
root: root element
path: target elements path
Returns:
list of path elements
Raises:
ILDictionaryParseError: path elements not found
"""
element = root.findall(path)
if not element:
raise ILDictionaryParseError(f"{path} element is not found")
return element
[docs] def read_dictionary(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as xdf_file:
tree = ET.parse(xdf_file)
except FileNotFoundError:
raise FileNotFoundError(f"There is not any xdf file in the path: {self.path}")
root = tree.getroot()
drive_image_element = self.__find_and_check(root, self.DRIVE_IMAGE_ELEMENT)
self.__read_drive_image(drive_image_element)
header_element = self.__find_and_check(root, self.HEADER_ELEMENT)
self.__read_header(header_element)
body_element = self.__find_and_check(root, self.BODY_ELEMENT)
self.__read_body(body_element)
def __read_drive_image(self, drive_image: ET.Element) -> None:
"""Process DriveImage element and set image
Args:
drive_image: DriveImage element
"""
if drive_image.text is not None and drive_image.text.strip():
self.image = drive_image.text.strip()
else:
self.image = None
def __read_header(self, root: ET.Element) -> None:
"""Process Header element
Args:
root: Header element
"""
version_element = self.__find_and_check(root, self.VERSION_ELEMENT)
self.__read_version(version_element)
# Dictionary localization not implemented
def __read_version(self, root: ET.Element) -> None:
"""Process Version element and set version
Args:
root: Version element
Raises:
ILDictionaryParseError: version is empty
"""
if root.text is None:
raise ILDictionaryParseError("Version is empty")
self.version = root.text.strip()
def __read_body(self, root: ET.Element) -> None:
"""Process Body element
Args:
root: Body element
"""
categories_element = self.__find_and_check(root, self.CATEGORIES_ELEMENT)
self.__read_categories(categories_element)
devices_element = self.__find_and_check(root, self.DEVICES_ELEMENT)
self.__read_devices(devices_element)
def __read_categories(self, root: ET.Element) -> None:
"""Process Categories element and set categories
Args:
root: Categories element
"""
category_list = self.__findall_and_check(root, self.CATEGORY_ELEMENT)
self.categories = DictionaryCategories(category_list)
def __read_devices(self, root: ET.Element) -> None:
"""Process Devices element
Args:
root: Devices element
"""
device_element = root.find(self.DEVICE_ELEMENT[self.interface])
if device_element is None:
raise ILDictionaryParseError("Dictionary can not be used for the chose communication")
self.__read_device_attributes(device_element)
if self.interface == Interface.ETH:
self.__read_device_eth(device_element)
if self.interface == Interface.CAN:
self.__read_device_can(device_element)
if self.interface == Interface.ECAT:
self.__read_device_ecat(device_element)
if self.interface == Interface.EoE:
self.__read_device_eoe(device_element)
def __read_device_attributes(self, device: ET.Element) -> None:
self.firmware_version = device.attrib[self.DEVICE_FW_VERSION_ATTR]
self.product_code = int(device.attrib[self.DEVICE_PRODUCT_CODE_ATTR])
self.part_number = device.attrib[self.DEVICE_PART_NUMBER_ATTR]
self.revision_number = int(device.attrib[self.DEVICE_REVISION_NUMBER_ATTR])
def __read_device_eoe(self, root: ET.Element) -> None:
"""Process EoEDevice element
Args:
root: EoEDevice element
"""
# Device element is identical
self.__read_device_eth(root)
def __read_device_eth(self, root: ET.Element) -> None:
"""Process ETHDevice element
Args:
root: ETHDevice element
"""
subnodes_element = self.__find_and_check(root, self.SUBNODES_ELEMENT)
self.__read_subnodes(subnodes_element)
registers_element = self.__find_and_check(root, self.MCB_REGISTERS_ELEMENT)
register_element_list = self.__findall_and_check(
registers_element, self.MCB_REGISTER_ELEMENT
)
for register_element in register_element_list:
self.__read_mcb_register(register_element)
errors_element = self.__find_and_check(root, self.ERRORS_ELEMENT)
self.__read_errors(errors_element)
def __read_device_ecat(self, root: ET.Element) -> None:
"""Process ECATDevice element
Args:
root: ECATDevice element
"""
subnodes_element = self.__find_and_check(root, self.SUBNODES_ELEMENT)
self.__read_subnodes(subnodes_element)
registers_element = self.__find_and_check(root, self.CANOPEN_OBJECTS_ELEMENT)
register_element_list = self.__findall_and_check(
registers_element, self.CANOPEN_OBJECT_ELEMENT
)
for register_element in register_element_list:
self.__read_canopen_object(register_element)
errors_element = self.__find_and_check(root, self.ERRORS_ELEMENT)
self.__read_errors(errors_element)
safety_pdos_element = root.find(self.SAFETY_PDOS_ELEMENT)
if safety_pdos_element is not None:
self.__read_safety_pdos(safety_pdos_element)
def __read_device_can(self, root: ET.Element) -> None:
"""Process CANDevice element
Args:
root: CANDevice element
"""
subnodes_element = self.__find_and_check(root, self.SUBNODES_ELEMENT)
self.__read_subnodes(subnodes_element)
registers_element = self.__find_and_check(root, self.CANOPEN_OBJECTS_ELEMENT)
register_element_list = self.__findall_and_check(
registers_element, self.CANOPEN_OBJECT_ELEMENT
)
for register_element in register_element_list:
self.__read_canopen_object(register_element)
errors_element = self.__find_and_check(root, self.ERRORS_ELEMENT)
self.__read_errors(errors_element)
def __read_subnodes(self, root: ET.Element) -> None:
"""Process Subnodes element and fill subnodes
Args:
root: Subnodes element
Raises:
ILDictionaryParseError: Subnode element text is None
"""
subnode_list = self.__findall_and_check(root, self.SUBNODE_ELEMENT)
for subnode in subnode_list:
if subnode.text is None:
raise ILDictionaryParseError("Subnode element text is None")
self.subnodes[int(subnode.attrib[self.SUBNODE_INDEX_ATTR])] = self.subnode_xdf_options[
subnode.text.strip()
]
def __read_errors(self, root: ET.Element) -> None:
"""Process Errors element and set errors
Args:
root: Errors element
"""
error_list = self.__findall_and_check(root, self.ERROR_ELEMENT)
self.errors = DictionaryErrors(error_list)
def __read_labels(self, root: ET.Element) -> Dict[str, str]:
"""Process Labels element
Args:
root: Labels element
Returns:
labels by localization
"""
label_list = self.__findall_and_check(root, self.LABEL_ELEMENT)
labels = {}
for label in label_list:
key, value = self.__read_label(label)
labels[key] = value
return labels
def __read_label(self, label: ET.Element) -> Tuple[str, str]:
"""Process Label element
Args:
label: Label element
Returns:
Tuple with label localization and label text
Raises:
ILDictionaryParseError: Label text is empty
"""
if label.text is None:
raise ILDictionaryParseError("Label text is empty")
return label.attrib[self.LABEL_LANG_ATTR], label.text.strip()
def __read_range(
self, range_elem: Optional[ET.Element]
) -> Union[Tuple[None, None], Tuple[str, str]]:
"""Process Range element
Args:
range_elem: Range element
Returns:
Tuple with minimum and maximum range, None if range is not limited
"""
if range_elem is not None:
range_min = range_elem.attrib[self.RANGE_MIN_ATTR]
range_max = range_elem.attrib[self.RANGE_MAX_ATTR]
return range_min, range_max
return None, None
def __read_enumeration(
self, enumerations_element: Optional[ET.Element]
) -> Optional[Dict[str, int]]:
"""Process Enumerations element if is not None
Args:
enumerations_element: Enumerations element
Returns:
If Enumerations is not None, return enums values
"""
if enumerations_element is not None:
enum_list = self.__findall_and_check(enumerations_element, self.ENUM_ELEMENT)
return {
str(enum_element.text.strip()): int(enum_element.attrib[self.ENUM_VALUE_ATTR])
for enum_element in enum_list
if enum_element.text is not None
}
return None
def __read_mcb_register(self, register: ET.Element) -> None:
"""Process MCBRegister element and add it to _registers
Args:
register: MCBRegister element
"""
reg_address = int(register.attrib[self.ADDRESS_ATTR], 16)
subnode = int(register.attrib[self.SUBNODE_ATTR])
address_type = self.address_type_xdf_options[register.attrib[self.ADDRESS_TYPE_ATTR]]
access = self.access_xdf_options[register.attrib[self.ACCESS_ATTR]]
dtype = self.dtype_xdf_options[register.attrib[self.DTYPE_ATTR]]
identifier = register.attrib[self.UID_ATTR]
cyclic = RegCyclicType(register.attrib[self.CYCLIC_ATTR])
description = register.attrib[self.DESCRIPTION_ATTR]
default = bytes.fromhex(register.attrib[self.DEFAULT_ATTR])
cat_id = register.attrib[self.CAT_ID_ATTR]
units = register.attrib.get(self.UNITS_ATTR)
# Labels
labels_element = self.__find_and_check(register, self.LABELS_ELEMENT)
labels = self.__read_labels(labels_element)
# Range
range_elem = register.find(self.RANGE_ELEMENT)
reg_range = self.__read_range(range_elem)
# Enumerations
enumerations_element = register.find(self.ENUMERATIONS_ELEMENT)
enums = self.__read_enumeration(enumerations_element)
ethernet_register = EthernetRegister(
reg_address,
dtype,
access,
identifier=identifier,
units=units,
cyclic=cyclic,
subnode=subnode,
reg_range=reg_range,
labels=labels,
enums=enums,
cat_id=cat_id,
address_type=address_type,
description=description,
default=default,
)
if subnode not in self._registers:
self._registers[subnode] = {}
self._registers[subnode][identifier] = ethernet_register
def __read_canopen_object(self, root: ET.Element) -> None:
"""Process CANopenObject element and add it to registers_group if has UID
Args:
root: CANopenObject element
"""
object_uid = root.attrib.get(self.UID_ATTR)
reg_index = int(root.attrib[self.INDEX_ATTR], 16)
subnode = int(root.attrib[self.SUBNODE_ATTR])
subitmes_element = self.__find_and_check(root, self.SUBITEMS_ELEMENT)
subitem_list = self.__findall_and_check(subitmes_element, self.SUBITEM_ELEMENT)
register_list = [
self.__read_canopen_subitem(subitem, reg_index, subnode) for subitem in subitem_list
]
if object_uid:
register_list.sort(key=lambda val: val.subidx)
if subnode not in self.registers_group:
self.registers_group[subnode] = {}
self.registers_group[subnode][object_uid] = list(register_list)
def __read_canopen_subitem(
self, subitem: ET.Element, reg_index: int, subnode: int
) -> CanopenRegister:
"""Process Subitem element and add it to _registers
Args:
subitem: CANopenObject element
reg_index: register index
subnode: register subnode
Returns:
Subitem register
"""
reg_subindex = int(subitem.attrib[self.SUBINDEX_ATTR])
address_type = self.address_type_xdf_options[subitem.attrib[self.ADDRESS_TYPE_ATTR]]
access = self.access_xdf_options[subitem.attrib[self.ACCESS_ATTR]]
dtype = self.dtype_xdf_options[subitem.attrib[self.DTYPE_ATTR]]
identifier = subitem.attrib[self.UID_ATTR]
cyclic = RegCyclicType(subitem.attrib[self.CYCLIC_ATTR])
description = subitem.attrib[self.DESCRIPTION_ATTR]
default = bytes.fromhex(subitem.attrib[self.DEFAULT_ATTR])
cat_id = subitem.attrib[self.CAT_ID_ATTR]
units = subitem.attrib.get(self.UNITS_ATTR)
# Labels
labels_element = self.__find_and_check(subitem, self.LABELS_ELEMENT)
labels = self.__read_labels(labels_element)
# Range
range_elem = subitem.find(self.RANGE_ELEMENT)
reg_range = self.__read_range(range_elem)
# Enumerations
enumerations_element = subitem.find(self.ENUMERATIONS_ELEMENT)
enums = self.__read_enumeration(enumerations_element)
canopen_register = CanopenRegister(
reg_index,
reg_subindex,
dtype,
access,
identifier=identifier,
units=units,
cyclic=cyclic,
subnode=subnode,
reg_range=reg_range,
labels=labels,
enums=enums,
cat_id=cat_id,
address_type=address_type,
description=description,
default=default,
)
if subnode not in self._registers:
self._registers[subnode] = {}
self._registers[subnode][identifier] = canopen_register
return canopen_register
def __read_safety_pdos(self, root: ET.Element) -> None:
"""Process SafetyPDOs element
Args:
root: MCBRegister element
"""
self.is_safe = True
rpdo_list = self.__findall_and_check(root, self.RPDO_ELEMENT)
for rpdo_element in rpdo_list:
uid, safety_rpdo = self.__read_pdo(rpdo_element)
self.safety_rpdos[uid] = safety_rpdo
tpdo_list = self.__findall_and_check(root, self.TPDO_ELEMENT)
for tpdo_element in tpdo_list:
uid, safety_tpdo = self.__read_pdo(tpdo_element)
self.safety_tpdos[uid] = safety_tpdo
def __read_pdo(self, pdo: ET.Element) -> Tuple[str, DictionarySafetyPDO]:
"""Process RPDO and TPDO elements
Args:
pdo: MCBRegister element
Returns:
PDO uid and class description
Raises:
ILDictionaryParseError: PDO register does not exist
"""
uid = pdo.attrib[self.PDO_UID_ATTR]
pdo_index = int(pdo.attrib[self.PDO_INDEX_ATTR], 16)
entry_list = self.__findall_and_check(pdo, self.PDO_ENTRY_ELEMENT)
pdo_registers = []
for entry in entry_list:
size = int(entry.attrib[self.PDO_ENTRY_SIZE_ATTR])
reg_subnode = int(entry.attrib.get(self.PDO_ENTRY_SUBNODE_ATTR, 1))
reg_uid = entry.text
if reg_uid:
if not (reg_subnode in self._registers and reg_uid in self._registers[reg_subnode]):
raise ILDictionaryParseError(
f"PDO entry {reg_uid} subnode {reg_subnode} does not exist"
)
entry_reg = self._registers[reg_subnode][reg_uid]
if not isinstance(entry_reg, CanopenRegister):
raise ValueError(f"{reg_uid} subnode {reg_subnode} is not a CANopen register")
pdo_registers.append(DictionarySafetyPDO.PDORegister(entry_reg, size))
else:
pdo_registers.append(DictionarySafetyPDO.PDORegister(None, size))
return uid, DictionarySafetyPDO(pdo_index, pdo_registers)
[docs]class DictionaryV2(Dictionary):
# Dictionary constants guide:
# Each constant has this structure: DICT_ORIGIN_END
# ORIGIN: The start point of the path
# END: The end point of the path
# ORIGIN: ROOT
DICT_ROOT = "."
DICT_ROOT_HEADER = f"{DICT_ROOT}/Header"
DICT_ROOT_VERSION = f"{DICT_ROOT_HEADER}/Version"
DICT_ROOT_BODY = f"{DICT_ROOT}/Body"
DICT_ROOT_DEVICE = f"{DICT_ROOT_BODY}/Device"
DICT_ROOT_CATEGORIES = f"{DICT_ROOT_DEVICE}/Categories"
DICT_ROOT_CATEGORY = f"{DICT_ROOT_CATEGORIES}/Category"
DICT_ROOT_ERRORS = f"{DICT_ROOT_BODY}/Errors"
DICT_ROOT_ERROR = f"{DICT_ROOT_ERRORS}/Error"
DICT_ROOT_AXES = f"{DICT_ROOT_DEVICE}/Axes"
DICT_ROOT_AXIS = f"{DICT_ROOT_AXES}/Axis"
DICT_ROOT_REGISTERS = f"{DICT_ROOT_DEVICE}/Registers"
DICT_ROOT_REGISTER = f"{DICT_ROOT_REGISTERS}/Register"
# ORIGIN: REGISTERS
DICT_REGISTERS = "./Registers"
DICT_REGISTERS_REGISTER = f"{DICT_REGISTERS}/Register"
# ORIGIN: RANGE
DICT_RANGE = "./Range"
# ORIGIN: ENUMERATIONS
DICT_ENUMERATIONS = "./Enumerations"
DICT_ENUMERATIONS_ENUMERATION = f"{DICT_ENUMERATIONS}/Enum"
DICT_IMAGE = "DriveImage"
dict_interface: Optional[str]
MON_DIST_STATUS_REGISTER = "MON_DIST_STATUS"
MONITORING_DISTURBANCE_REGISTERS: Union[
List[EthercatRegister], List[EthernetRegister], List[CanopenRegister]
]
[docs] def read_dictionary(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as xdf_file:
tree = ET.parse(xdf_file)
except FileNotFoundError:
raise FileNotFoundError(f"There is not any xdf file in the path: {self.path}")
root = tree.getroot()
device = root.find(self.DICT_ROOT_DEVICE)
if device is None:
raise ILDictionaryParseError(
f"Could not load the dictionary {self.path}. Device information is missing"
)
# Subnodes
if root.findall(self.DICT_ROOT_AXES):
self.subnodes[0] = SubnodeType.COMMUNICATION
for i in range(1, len(root.findall(self.DICT_ROOT_AXIS))):
self.subnodes[i] = SubnodeType.MOTION
else:
self.subnodes[0] = SubnodeType.COMMUNICATION
self.subnodes[1] = SubnodeType.MOTION
for subnode in self.subnodes:
self._registers[subnode] = {}
# Categories
list_xdf_categories = root.findall(self.DICT_ROOT_CATEGORY)
self.categories = DictionaryCategories(list_xdf_categories)
# Errors
list_xdf_errors = root.findall(self.DICT_ROOT_ERROR)
self.errors = DictionaryErrors(list_xdf_errors)
# Version
version_node = root.find(self.DICT_ROOT_VERSION)
if version_node is not None and version_node.text is not None:
self.version = version_node.text
self.firmware_version = device.attrib.get("firmwareVersion")
product_code = device.attrib.get("ProductCode")
if product_code is not None and product_code.isdecimal():
self.product_code = int(product_code)
self.part_number = device.attrib.get("PartNumber")
revision_number = device.attrib.get("RevisionNumber")
if revision_number is not None and revision_number.isdecimal():
self.revision_number = int(revision_number)
self.dict_interface = device.attrib.get("Interface")
if root.findall(self.DICT_ROOT_AXES):
# For each axis
for axis in root.findall(self.DICT_ROOT_AXIS):
for register in axis.findall(self.DICT_REGISTERS_REGISTER):
current_read_register = self._read_xdf_register(register)
if current_read_register:
self._add_register_list(current_read_register)
else:
for register in root.findall(self.DICT_ROOT_REGISTER):
current_read_register = self._read_xdf_register(register)
if current_read_register:
self._add_register_list(current_read_register)
try:
image = root.find(self.DICT_IMAGE)
if image is not None and image.text is not None and image.text.strip():
self.image = image.text
except AttributeError:
logger.error(f"Dictionary {Path(self.path).name} has no image section.")
# Closing xdf file
xdf_file.close()
self._append_missing_registers()
def _read_xdf_register(self, register: ET.Element) -> Optional[Register]:
"""Reads a register from the dictionary and creates a Register instance.
Args:
register: Register instance from the dictionary.
Returns:
The current register which it has been reading
None: When at least a mandatory attribute is not in a xdf file
Raises:
ILDictionaryParseError: If the register data type is invalid.
ILDictionaryParseError: If the register access type is invalid.
ILDictionaryParseError: If the register address type is invalid.
KeyError: If some attribute is missing.
"""
try:
identifier = register.attrib["id"]
except KeyError as ke:
logger.error(f"The register doesn't have an identifier. Error caught: {ke}")
return None
try:
units = register.attrib["units"]
cyclic = RegCyclicType(register.attrib.get("cyclic", "CONFIG"))
# Data type
dtype_aux = register.attrib["dtype"]
dtype = None
if dtype_aux in self.dtype_xdf_options:
dtype = self.dtype_xdf_options[dtype_aux]
else:
raise ILDictionaryParseError(
f"The data type {dtype_aux} does not exist for the register: {identifier}"
)
# Access type
access_aux = register.attrib["access"]
access = None
if access_aux in self.access_xdf_options:
access = self.access_xdf_options[access_aux]
else:
raise ILDictionaryParseError(
f"The access type {access_aux} does not exist for the register: {identifier}"
)
# Address type
address_type_aux = register.attrib["address_type"]
if address_type_aux in self.address_type_xdf_options:
address_type = self.address_type_xdf_options[address_type_aux]
else:
raise ILDictionaryParseError(
f"The address type {address_type_aux} does not exist for the register: "
f"{identifier}"
)
subnode = int(register.attrib.get("subnode", 1))
storage = register.attrib.get("storage")
cat_id = register.attrib.get("cat_id")
internal_use = int(register.attrib.get("internal_use", 0))
# Labels
labels_elem = register.findall(DICT_LABELS_LABEL)
labels = {label.attrib["lang"]: str(label.text) for label in labels_elem}
# Range
range_elem = register.find(self.DICT_RANGE)
reg_range: Union[Tuple[None, None], Tuple[str, str]] = (None, None)
if range_elem is not None:
range_min = range_elem.attrib["min"]
range_max = range_elem.attrib["max"]
reg_range = (range_min, range_max)
# Enumerations
enums_elem = register.findall(self.DICT_ENUMERATIONS_ENUMERATION)
enums = {str(enum.text): int(enum.attrib["value"]) for enum in enums_elem}
current_read_register = Register(
dtype,
access,
identifier=identifier,
units=units,
cyclic=cyclic,
subnode=subnode,
storage=storage,
reg_range=reg_range,
labels=labels,
enums=enums,
cat_id=cat_id,
internal_use=internal_use,
address_type=address_type,
)
return current_read_register
except KeyError as ke:
logger.error(f"Register with ID {identifier} has not attribute {ke}")
return None
def _add_register_list(
self,
register: Register,
) -> None:
"""Adds the current read register into the _registers list
Args:
register: the current read register it will be instanced
"""
identifier = register.identifier
subnode = register.subnode
if identifier is None:
return
self._registers[subnode][identifier] = register
def _append_missing_registers(
self,
) -> None:
"""Append missing registers to the dictionary.
Mainly registers needed for Monitoring/Disturbance and PDOs.
"""
if self.MON_DIST_STATUS_REGISTER in self._registers[0]:
for register in self.MONITORING_DISTURBANCE_REGISTERS:
if register.identifier is not None:
self._registers[register.subnode][register.identifier] = register