from typing import List, Optional, Union, Dict
import bitarray
from ingenialink.canopen.register import CanopenRegister
from ingenialink.enums.register import REG_DTYPE, REG_ACCESS, RegCyclicType
from ingenialink.ethercat.register import EthercatRegister
from ingenialink.exceptions import ILError
from ingenialink.servo import Servo
from ingenialink.utils._utils import (
convert_bytes_to_dtype,
convert_dtype_to_bytes,
dtype_length_bits,
)
BIT_ENDIAN = "little"
bitarray._set_default_endian(BIT_ENDIAN)
PADDING_REGISTER_IDENTIFIER = "PADDING"
[docs]class PDOMapItem:
"""Abstract class to represent a register in the PDO mapping.
Attributes:
register: mapped register object. If None the item will padding.
size_bits: custom register size in bits.
Raises:
ValueError: If the register and size_bits are not provided.
ValueError: If the size_bits value is invalid. Only when the register
is set to None.
"""
ACCEPTED_CYCLIC: RegCyclicType
"""Accepted cyclic: CYCLIC_TX, CYCLIC_RX or CYCLIC_TXRX."""
def __init__(
self,
register: Union[None, EthercatRegister, CanopenRegister] = None,
size_bits: Optional[int] = None,
) -> None:
if register is None:
if size_bits is None:
raise ValueError("The size bits must be set when creating padding items.")
register = EthercatRegister(
identifier=PADDING_REGISTER_IDENTIFIER,
units="",
subnode=0,
idx=0x0000,
subidx=0x00,
cyclic=self.ACCEPTED_CYCLIC,
dtype=REG_DTYPE.STR,
access=REG_ACCESS.RW,
)
self.register = register
self.size_bits = size_bits or dtype_length_bits[register.dtype]
self._raw_data_bits: Optional[bitarray.bitarray] = None
self._check_if_mappable()
def _check_if_mappable(self) -> None:
"""Check if the passed register is mappable. I.e., if the cyclic information is correct.
Raises:
ILError: Tf the register is not mappable.
"""
if self.register.cyclic not in [self.ACCEPTED_CYCLIC, RegCyclicType.TXRX]:
raise ILError(
f"Incorrect cyclic. It should be {self.ACCEPTED_CYCLIC} or {RegCyclicType.TXRX},"
f" obtained: {self.register.cyclic}"
)
@property
def raw_data_bits(self) -> bitarray.bitarray:
"""Raw data in bits.
Returns:
Raw data in bits
Raises:
ILError: If the raw data is empty.
"""
if self._raw_data_bits is None:
raise ILError("Raw data is empty.")
return self._raw_data_bits
@raw_data_bits.setter
def raw_data_bits(self, data: bitarray.bitarray) -> None:
if len(data) != self.size_bits:
raise ILError(f"Wrong size. Expected {self.size_bits}, obtained {len(data)}")
if data.endian() != BIT_ENDIAN:
raise ILError("Bitarray should be little endian.")
self._raw_data_bits = data
@property
def raw_data_bytes(self) -> bytes:
"""Raw data in bytes.
Returns:
Raw data in bytes
Raises:
ILError: If the raw data is empty.
"""
if self._raw_data_bits is None:
raise ILError("Raw data is empty.")
return self._raw_data_bits.tobytes()
@raw_data_bytes.setter
def raw_data_bytes(self, data: bytes) -> None:
data_bits = bitarray.bitarray(endian=BIT_ENDIAN)
data_bits.frombytes(data)
if self.register.identifier == PADDING_REGISTER_IDENTIFIER:
data_bits = data_bits[: self.size_bits]
self.raw_data_bits = data_bits
@property
def value(self) -> Union[int, float, bool]:
"""Register value. Converts the raw data bytes into the register value.
Raises:
ILError: If the raw data is empty.
ILError: If the register type is not int or float.
Returns:
Register value.
"""
value: Union[bool, int, float, str]
if self.register.identifier == PADDING_REGISTER_IDENTIFIER:
raise NotImplementedError(
"The register value must be read by the raw_data_bytes attribute."
)
if self.register.dtype == REG_DTYPE.BOOL:
value = self.raw_data_bits.any()
else:
value = convert_bytes_to_dtype(self.raw_data_bytes, self.register.dtype)
if not isinstance(value, (int, float, bool)):
raise ILError("Wrong register value type")
return value
@property
def register_mapping(self) -> bytes:
"""Arrange register information into PDO mapping format.
Returns:
PDO register mapping format.
"""
index = self.register.idx
mapped_register = (index << 16) | self.size_bits
mapped_register_bytes: bytes = mapped_register.to_bytes(4, "little")
return mapped_register_bytes
[docs]class RPDOMapItem(PDOMapItem):
"""Class to represent RPDO mapping items."""
ACCEPTED_CYCLIC = RegCyclicType.RX
def __init__(
self,
register: Union[None, EthercatRegister, CanopenRegister] = None,
size_bits: Optional[int] = None,
) -> None:
super().__init__(register, size_bits)
@property
def value(self) -> Union[int, float]:
return super().value
@value.setter
def value(self, value: Union[int, float, bool]) -> None:
if self.register.identifier == PADDING_REGISTER_IDENTIFIER:
raise NotImplementedError(
"The register value must be set by the raw_data_bytes attribute."
)
if isinstance(value, bool):
raw_data_bits = bitarray.bitarray(endian=BIT_ENDIAN)
raw_data_bits.append(value)
self.raw_data_bits = raw_data_bits
else:
raw_data_bytes = convert_dtype_to_bytes(value, self.register.dtype)
self.raw_data_bytes = raw_data_bytes
[docs]class TPDOMapItem(PDOMapItem):
"""Class to represent TPDO mapping items."""
ACCEPTED_CYCLIC = RegCyclicType.TX
[docs]class PDOMap:
"""Abstract class that contains PDO mapping information."""
_PDO_MAP_ITEM_CLASS = PDOMapItem
def __init__(self) -> None:
self.__items: List[PDOMapItem] = []
self.__map_register_address: Optional[int] = None
[docs] def create_item(
self, register: Union[EthercatRegister, CanopenRegister], size_bits: Optional[int] = None
) -> PDOMapItem:
"""Create a new PDOMapItem.
Args:
register: Register object.
size_bits: Register size in bits.
Returns:
PDO Map item.
"""
item = self._PDO_MAP_ITEM_CLASS(register, size_bits)
return item
[docs] def add_item(self, item: PDOMapItem) -> None:
"""Append a new item.
Args:
item: Item to be added.
"""
self.__items.append(item)
[docs] def add_registers(
self,
registers: Union[
Union[EthercatRegister, CanopenRegister], List[Union[EthercatRegister, CanopenRegister]]
],
) -> None:
"""Add a register or a list of registers in bulk.
It creates a new item for each register and adds it the items attribute.
Args:
registers: Register object or list of Registers.
"""
if not isinstance(registers, list):
registers = [registers]
for register in registers:
item = self.create_item(register)
self.add_item(item)
@property
def items(self) -> List[PDOMapItem]:
"""List of items.
Returns:
List of items.
"""
return self.__items
@property
def map_register_index_bytes(self) -> bytes:
"""Index of the mapping register in bytes.
Returns:
Index of the mapping register in bytes.
Raises:
ValueError: If map_register_index is None
"""
if self.map_register_index is None:
raise ValueError("map_register_index is None")
else:
return self.map_register_index.to_bytes(4, "little")
@property
def map_register_index(self) -> Optional[int]:
"""Index of the mapping register. None if it is not mapped in the slave.
Returns:
Index of the mapping register.
"""
return self.__map_register_address
@map_register_index.setter
def map_register_index(self, address: int) -> None:
self.__map_register_address = address
@property
def data_length_bits(self) -> int:
"""Length of the map in bits.
Returns:
Length of the map in bits.
"""
return sum(item.size_bits for item in self.items)
@property
def data_length_bytes(self) -> int:
"""Length of the map in bytes.
Returns:
Length of the map in bytes.
"""
return bitarray.bits2bytes(self.data_length_bits)
@property
def items_mapping(self) -> bytearray:
"""Returns all register item mappings concatenated.
Returns:
int: _description_
"""
map_bytes = bytearray()
for pdo_map_item in self.items:
map_bytes += pdo_map_item.register_mapping
return map_bytes
[docs]class RPDOMap(PDOMap):
"""Class to store RPDO mapping information."""
_PDO_MAP_ITEM_CLASS = RPDOMapItem
[docs] def get_item_bits(self) -> bitarray.bitarray:
"""Return the concatenated items raw data to be sent to the slave (in bits).
Raises:
ILError: Raw data is empty.
ILError: If the length of the bit array is incorrect.
Returns:
Concatenated items raw data in bits.
"""
data_bits = bitarray.bitarray(endian=BIT_ENDIAN)
for item in self.items:
try:
data_bits += item.raw_data_bits
except ILError:
raise ILError(f"PDO item {item.register.identifier} does not have data stored.")
if len(data_bits) != self.data_length_bits:
raise ILError(
"The length in bits of the data array is incorrect. Expected"
f" {self.data_length_bits}, obtained {len(data_bits)}"
)
return data_bits
[docs] def get_item_bytes(self) -> bytes:
"""Return the concatenated items raw data to be sent to the slave (in bytes).
Raises:
ILError: Raw data is empty.
ILError: If the length of th byte array is incorrect.
Returns:
Concatenated items raw data in bytes.
"""
item_bits = self.get_item_bits()
return item_bits.tobytes()
[docs]class TPDOMap(PDOMap):
"""Class to store TPDO mapping information."""
_PDO_MAP_ITEM_CLASS = TPDOMapItem
[docs] def set_item_bytes(self, data_bytes: bytes) -> None:
"""Set the items raw data from a byte array received from the slave.
Args:
data_bytes: Byte array received from the slave.
Raises:
ILError: If the length of the received data does not coincide.
"""
if len(data_bytes) != self.data_length_bytes:
raise ILError(
f"The length of the data array is incorrect. Expected {self.data_length_bytes},"
f" obtained {len(data_bytes)}"
)
data_bits = bitarray.bitarray(endian=BIT_ENDIAN)
data_bits.frombytes(data_bytes)
offset = 0
for item in self.items:
item.raw_data_bits = data_bits[offset : item.size_bits + offset]
offset += item.size_bits
[docs]class PDOServo(Servo):
"""Abstract class to implement PDOs in a Servo class."""
AVAILABLE_PDOS = 1
RPDO_ASSIGN_REGISTER_SUB_IDX_0 = "RPDO_ASSIGN_REGISTER_SUB_IDX_0"
RPDO_ASSIGN_REGISTER_SUB_IDX_1 = "RPDO_ASSIGN_REGISTER_SUB_IDX_1"
RPDO_MAP_REGISTER_SUB_IDX_0 = ["RPDO_MAP_REGISTER_SUB_IDX_0"]
RPDO_MAP_REGISTER_SUB_IDX_1 = ["RPDO_MAP_REGISTER_SUB_IDX_1"]
TPDO_ASSIGN_REGISTER_SUB_IDX_0 = "TPDO_ASSIGN_REGISTER_SUB_IDX_0"
TPDO_ASSIGN_REGISTER_SUB_IDX_1 = "TPDO_ASSIGN_REGISTER_SUB_IDX_1"
TPDO_MAP_REGISTER_SUB_IDX_0 = ["TPDO_MAP_REGISTER_SUB_IDX_0"]
TPDO_MAP_REGISTER_SUB_IDX_1 = ["TPDO_MAP_REGISTER_SUB_IDX_1"]
def __init__(
self,
target: Union[int, str],
dictionary_path: str,
servo_status_listener: bool = False,
):
super().__init__(target, dictionary_path, servo_status_listener)
self._rpdo_maps: List[RPDOMap] = []
self._tpdo_maps: List[TPDOMap] = []
[docs] def reset_rpdo_mapping(self) -> None:
"""Delete the RPDO mapping stored in the servo slave."""
self.write(self.RPDO_ASSIGN_REGISTER_SUB_IDX_0, 0, subnode=0)
for map_register in self.RPDO_MAP_REGISTER_SUB_IDX_0:
self.write(map_register, 0, subnode=0)
self._rpdo_maps.clear()
[docs] def reset_tpdo_mapping(self) -> None:
"""Delete the TPDO mapping stored in the servo slave."""
self.write(self.TPDO_ASSIGN_REGISTER_SUB_IDX_0, 0, subnode=0)
for map_register in self.TPDO_MAP_REGISTER_SUB_IDX_0:
self.write(map_register, 0, subnode=0)
self._tpdo_maps.clear()
[docs] def map_rpdos(self) -> None:
"""Map the RPDO registers into the servo slave.
It takes the first available RPDO assignment slot of the slave.
Raises:
ILError: If there are no available PDOs.
"""
if len(self._rpdo_maps) > self.AVAILABLE_PDOS:
raise ILError(
f"Could not map the RPDO maps, received {len(self._rpdo_maps)} PDOs and only"
f" {self.AVAILABLE_PDOS} are available"
)
self.write(self.RPDO_ASSIGN_REGISTER_SUB_IDX_0, len(self._rpdo_maps), subnode=0)
custom_map_index = 0
rpdo_assigns = b""
for rpdo_map in self._rpdo_maps:
if rpdo_map.map_register_index is None:
self._set_rpdo_map_register(custom_map_index, rpdo_map)
custom_map_index += 1
rpdo_assigns += rpdo_map.map_register_index_bytes
self.write(
self.RPDO_ASSIGN_REGISTER_SUB_IDX_1, rpdo_assigns, complete_access=True, subnode=0
)
def _set_rpdo_map_register(self, rpdo_map_register_index: int, rpdo_map: RPDOMap) -> None:
"""Fill RPDO map register with PRDOMap object data
Args:
rpdo_map_register_index: custom rpdo map register index
rpdo_map: custom rpdo data
"""
self.write(
self.RPDO_MAP_REGISTER_SUB_IDX_0[rpdo_map_register_index],
len(rpdo_map.items),
subnode=0,
)
self.write(
self.RPDO_MAP_REGISTER_SUB_IDX_1[rpdo_map_register_index],
rpdo_map.items_mapping.decode("utf-8"),
complete_access=True,
subnode=0,
)
rpdo_map_register = self.dictionary.registers(0)[
self.RPDO_MAP_REGISTER_SUB_IDX_0[rpdo_map_register_index]
]
if not isinstance(rpdo_map_register, EthercatRegister):
raise ValueError(
"Error retrieving the RPDO Map register. Expected EthercatRegister, got:"
f" {type(rpdo_map_register)}"
)
rpdo_map.map_register_index = rpdo_map_register.idx
[docs] def map_tpdos(self) -> None:
"""Map the TPDO registers into the servo slave.
It takes the first available TPDO assignment slot of the slave.
Raises:
ILError: If there are no available PDOs.
"""
if len(self._tpdo_maps) > self.AVAILABLE_PDOS:
raise ILError(
f"Could not map the TPDO maps, received {len(self._tpdo_maps)} PDOs and only"
f" {self.AVAILABLE_PDOS} are available"
)
self.write(self.TPDO_ASSIGN_REGISTER_SUB_IDX_0, len(self._tpdo_maps), subnode=0)
custom_map_index = 0
tpdo_assigns = b""
for tpdo_map in self._tpdo_maps:
if tpdo_map.map_register_index is None:
self._set_tpdo_map_register(custom_map_index, tpdo_map)
custom_map_index += 1
tpdo_assigns += tpdo_map.map_register_index_bytes
self.write(
self.TPDO_ASSIGN_REGISTER_SUB_IDX_1, tpdo_assigns, complete_access=True, subnode=0
)
def _set_tpdo_map_register(self, tpdo_map_register_index: int, tpdo_map: TPDOMap) -> None:
"""Fill TPDO map register with TRDOMap object data
Args:
tpdo_map_register_index: custom tpdo map register index
tpdo_map: custom tpdo data
"""
self.write(
self.TPDO_MAP_REGISTER_SUB_IDX_0[tpdo_map_register_index],
len(tpdo_map.items),
subnode=0,
)
self.write(
self.TPDO_MAP_REGISTER_SUB_IDX_1[tpdo_map_register_index],
tpdo_map.items_mapping.decode("utf-8"),
complete_access=True,
subnode=0,
)
tpdo_map_register = self.dictionary.registers(0)[
self.TPDO_MAP_REGISTER_SUB_IDX_0[tpdo_map_register_index]
]
if not isinstance(tpdo_map_register, EthercatRegister):
raise ValueError(
"Error retrieving the TPDO Map register. Expected EthercatRegister, got:"
f" {type(tpdo_map_register)}"
)
tpdo_map.map_register_index = tpdo_map_register.idx
[docs] def map_pdos(self, slave_index: int) -> None:
"""Map RPDO and TPDO register into the slave.
Args:
slave_index: salve index.
"""
self.map_tpdos()
self.map_rpdos()
[docs] def reset_pdo_mapping(self) -> None:
"""Reset the RPDO and TPDO mapping in the slave."""
self.reset_rpdo_mapping()
self.reset_tpdo_mapping()
[docs] def remove_rpdo_map(
self, rpdo_map: Optional[RPDOMap] = None, rpdo_map_index: Optional[int] = None
) -> None:
"""Remove a RPDOMap from the RPDOMap list.
Args:
rpdo_map: The RPDOMap instance to be removed.
rpdo_map_index: The index of the RPDOMap list to be removed.
Raises:
ValueError: If the RPDOMap instance is not in the RPDOMap list.
IndexError: If the index is out of range.
"""
if rpdo_map_index is None and rpdo_map is None:
raise ValueError("The RPDOMap instance or the index should be provided.")
if rpdo_map is not None:
self._rpdo_maps.remove(rpdo_map)
return
if rpdo_map_index is not None:
self._rpdo_maps.pop(rpdo_map_index)
[docs] def remove_tpdo_map(
self, tpdo_map: Optional[TPDOMap] = None, tpdo_map_index: Optional[int] = None
) -> None:
"""Remove a TPDOMap from the TPDOMap list.
Args:
tpdo_map: The TPDOMap instance to be removed.
tpdo_map_index: The index of the TPDOMap list to be removed.
Raises:
ValueError: If the TPDOMap instance is not in the TPDOMap list.
IndexError: If the index is out of range.
"""
if tpdo_map_index is None and tpdo_map is None:
raise ValueError("The TPDOMap instance or the index should be provided.")
if tpdo_map is not None:
self._tpdo_maps.remove(tpdo_map)
return
if tpdo_map_index is not None:
self._tpdo_maps.pop(tpdo_map_index)
[docs] def set_pdo_map_to_slave(self, rpdo_maps: List[RPDOMap], tpdo_maps: List[TPDOMap]) -> None:
"""Callback called by the slave to configure the map.
Args:
rpdo_maps: List of RPDO maps.
tpdo_maps: List of TPDO maps.
"""
raise NotImplementedError
[docs] def generate_pdo_outputs(self) -> None:
"""Process the PDO outputs.
It should call _process_tpdo method using the received bytes as argument.
"""
raise NotImplementedError
def _process_tpdo(self, input_data: bytes) -> None:
"""Convert the TPDO values from bytes to the registers data type.
Args:
input_data: Concatenated received data bytes.
"""
if len(self._tpdo_maps) == 0:
return
for tpdo_map in self._tpdo_maps:
map_bytes = input_data[: tpdo_map.data_length_bytes]
tpdo_map.set_item_bytes(map_bytes)
def _process_rpdo(self) -> Optional[bytes]:
"""Retrieve the RPDO raw data from each map.
Return:
Concatenated data bytes to be sent.
"""
if len(self._rpdo_maps) == 0:
return None
output = bytearray()
for rpdo_map in self._rpdo_maps:
output += rpdo_map.get_item_bytes()
return bytes(output)