from typing import Any, Callable, Optional, Union
import canopen
import ingenialogger
from canopen.emcy import EmcyConsumer
from ingenialink.dictionary import Interface
from ingenialink.canopen.dictionary import CanopenDictionaryV2
from ingenialink.canopen.register import CanopenRegister
from ingenialink.constants import CAN_MAX_WRITE_SIZE
from ingenialink.enums.register import REG_ACCESS, REG_DTYPE
from ingenialink.exceptions import ILIOError
from ingenialink.register import Register
from ingenialink.servo import Servo
logger = ingenialogger.get_logger(__name__)
CANOPEN_SDO_RESPONSE_TIMEOUT = 0.3
[docs]class CanopenServo(Servo):
"""CANopen Servo instance.
Args:
target: Node ID to be connected.
node: Remote Node of the drive.
dictionary_path: Path to the dictionary.
servo_status_listener: Toggle the listener of the servo for
its status, errors, faults, etc.
"""
MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE
STATUS_WORD_REGISTERS = "CIA402_DRV_STATE_STATUS"
RESTORE_COCO_ALL = "CIA301_COMMS_RESTORE_ALL"
STORE_COCO_ALL = "CIA301_COMMS_STORE_ALL"
interface = Interface.CAN
def __init__(
self,
target: int,
node: canopen.RemoteNode,
dictionary_path: str,
servo_status_listener: bool = False,
) -> None:
self.__node = node
self.__emcy_consumer = EmcyConsumer()
super(CanopenServo, self).__init__(target, dictionary_path, servo_status_listener)
[docs] def read(
self, reg: Union[str, Register], subnode: int = 1, **kwargs: Any
) -> Union[int, float, str, bytes]:
value = super().read(reg, subnode=subnode)
if isinstance(value, str):
value = value.replace("\x00", "")
return value
[docs] def store_parameters(self, subnode: Optional[int] = None, sdo_timeout: int = 3) -> None:
"""Store all the current parameters of the target subnode.
Args:
subnode: Subnode of the axis. `None` by default which stores all the parameters.
sdo_timeout: Timeout value for each SDO response.
Raises:
ILError: Invalid subnode.
ILObjectNotExist: Failed to write to the registers.
"""
self._change_sdo_timeout(sdo_timeout)
super().store_parameters(subnode)
self._change_sdo_timeout(CANOPEN_SDO_RESPONSE_TIMEOUT)
def _write_raw(self, reg: CanopenRegister, data: bytes) -> None: # type: ignore [override]
try:
self._lock.acquire()
self.__node.sdo.download(reg.idx, reg.subidx, data)
except Exception as e:
logger.error("Failed writing %s. Exception: %s", str(reg.identifier), e)
error_raised = f"Error writing {reg.identifier}"
raise ILIOError(error_raised) from e
finally:
self._lock.release()
def _read_raw(self, reg: CanopenRegister) -> bytes: # type: ignore [override]
try:
self._lock.acquire()
value = self.__node.sdo.upload(reg.idx, reg.subidx)
except Exception as e:
logger.error("Failed reading %s. Exception: %s", str(reg.identifier), e)
error_raised = f"Error reading {reg.identifier}"
raise ILIOError(error_raised)
finally:
self._lock.release()
if not isinstance(value, bytes):
return bytes()
return value
[docs] def emcy_subscribe(self, cb: Callable[..., Any]) -> int:
"""Subscribe to emergency messages.
Args:
cb: Callback
Returns:
Assigned slot.
"""
self.__emcy_consumer.add_callback(cb)
return len(self.__emcy_consumer.callbacks) - 1
[docs] def emcy_unsubscribe(self, slot: int) -> None:
"""Unsubscribe from emergency messages.
Args:
slot: Assigned slot when subscribed.
"""
del self.__emcy_consumer.callbacks[slot]
def _change_sdo_timeout(self, value: float) -> None:
"""Changes the SDO timeout of the node."""
self.__node.sdo.RESPONSE_TIMEOUT = value
@staticmethod
def _monitoring_disturbance_map_can_address(address: int, subnode: int) -> int:
"""Map CAN register address to IPB register address."""
return address - (0x2000 + (0x800 * (subnode - 1)))
def _monitoring_disturbance_data_to_map_register(
self, subnode: int, address: int, dtype: int, size: int
) -> int:
"""Arrange necessary data to map a monitoring/disturbance register.
Args:
subnode: Subnode to be targeted.
address: Register address to map.
dtype: Register data type.
size: Size of data in bytes.
"""
ipb_address = self._monitoring_disturbance_map_can_address(address, subnode)
return super()._monitoring_disturbance_data_to_map_register(
subnode, ipb_address, dtype, size
)
@property
def node(self) -> canopen.RemoteNode:
"""Remote node of the servo."""
return self.__node