Source code for ingenialink.ethercat.servo

import time
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Dict, Any

import ingenialogger

try:
    import pysoem
except ImportError as ex:
    pysoem = None
    pysoem_import_error = ex

if TYPE_CHECKING:
    from pysoem import CdefSlave

from ingenialink.constants import CAN_MAX_WRITE_SIZE, CANOPEN_ADDRESS_OFFSET, MAP_ADDRESS_OFFSET
from ingenialink.ethercat.register import EthercatRegister
from ingenialink.exceptions import ILIOError, ILTimeoutError, ILError
from ingenialink.pdo import PDOServo, RPDOMap, TPDOMap
from ingenialink.register import REG_ACCESS, REG_DTYPE
from ingenialink.dictionary import Interface

logger = ingenialogger.get_logger(__name__)


[docs]class SDO_OPERATION_MSG(Enum): """Message for exceptions depending on the operation type.""" READ = "reading" WRITE = "writing"
[docs]class EthercatServo(PDOServo): """Ethercat Servo instance. Args: slave: Slave to be connected. slave_id: Slave ID. dictionary_path: Path to the dictionary. connection_timeout: Time in seconds of the connection timeout. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. Raises: ImportError: WinPcap is not installed """ MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE MONITORING_DATA_BUFFER_SIZE = 1024 NO_RESPONSE_WORKING_COUNTER = 0 TIMEOUT_WORKING_COUNTER = -5 NOFRAME_WORKING_COUNTER = -1 ETHERCAT_PDO_WATCHDOG = "processdata" SECONDS_TO_MS_CONVERSION_FACTOR = 1000 interface = Interface.ECAT def __init__( self, slave: "CdefSlave", slave_id: int, dictionary_path: str, connection_timeout: float, servo_status_listener: bool = False, ): if not pysoem: raise pysoem_import_error self.__slave = slave self.slave_id = slave_id self._connection_timeout = connection_timeout super(EthercatServo, self).__init__(slave_id, dictionary_path, servo_status_listener) def _read_raw( # type: ignore [override] self, reg: EthercatRegister, buffer_size: int = 0, complete_access: bool = False, start_time: Optional[float] = None, ) -> bytes: retry = False if start_time is None: start_time = time.time() self._lock.acquire() try: time.sleep(0.0001) # Unlock threads before SDO read value: bytes = self.__slave.sdo_read(reg.idx, reg.subidx, buffer_size, complete_access) except ( pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError, pysoem.WkcError, ILIOError, ) as e: self._handle_sdo_exception(reg, SDO_OPERATION_MSG.READ, e) except pysoem.Emergency as e: if time.time() > start_time + self._connection_timeout: raise ILTimeoutError("Emergency messages could not be cleared.") from e retry = True finally: self._lock.release() if retry: return self._read_raw(reg, buffer_size, complete_access, start_time) return value def _write_raw( # type: ignore [override] self, reg: EthercatRegister, data: bytes, complete_access: bool = False, start_time: Optional[float] = None, ) -> None: retry = False if start_time is None: start_time = time.time() self._lock.acquire() try: time.sleep(0.0001) # Unlock threads before SDO write self.__slave.sdo_write(reg.idx, reg.subidx, data, complete_access) except ( pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError, pysoem.WkcError, ILIOError, ) as e: self._handle_sdo_exception(reg, SDO_OPERATION_MSG.WRITE, e) except pysoem.Emergency as e: if time.time() > start_time + self._connection_timeout: raise ILTimeoutError("Emergency messages could not be cleared.") from e retry = True finally: self._lock.release() if retry: return self._write_raw(reg, data, complete_access, start_time) def _handle_sdo_exception( self, reg: EthercatRegister, operation_msg: SDO_OPERATION_MSG, exception: Exception ) -> None: """ Handle the exceptions that occur when reading or writing SDOs. Args: reg: The register that was read or written. operation_msg: Operation type message to be shown on the exception. exception: The exception that occurred while reading or writing. Raises: ILIOError: If the register cannot be read or written. ILIOError: If the slave fails to acknowledge the command. ILIOError: If the working counter value is wrong. ILTimeoutError: If the slave fails to respond within the connection timeout period. """ if isinstance( exception, (pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError, ILIOError) ): raise ILIOError( f"Error {operation_msg.value} {reg.identifier}. Reason: {exception}" ) from exception elif isinstance(exception, pysoem.WkcError): default_error_msg = f"Error {operation_msg.value} data" wkc_exceptions: Dict[int, ILError] = { self.NO_RESPONSE_WORKING_COUNTER: ILIOError( f"{default_error_msg}: The working counter remained unchanged." ), self.NOFRAME_WORKING_COUNTER: ILIOError(f"{default_error_msg}: No frame."), self.TIMEOUT_WORKING_COUNTER: ILTimeoutError( f"Timeout {operation_msg.value} data." ), } exc = wkc_exceptions.get( exception.wkc, ILIOError(f"{default_error_msg}. Wrong working counter: {exception.wkc}"), ) raise exc from exception def _monitoring_read_data(self, **kwargs: Any) -> bytes: """Read monitoring data frame.""" return super()._monitoring_read_data( buffer_size=self.MONITORING_DATA_BUFFER_SIZE, complete_access=True ) def _disturbance_write_data(self, data: bytes, **kwargs: Any) -> None: """Write disturbance data.""" super()._disturbance_write_data(data, complete_access=True) @staticmethod def __monitoring_disturbance_map_can_address(address: int, subnode: int) -> int: """Map CAN register address to IPB register address. Args: subnode: Subnode to be targeted. address: Register address to map. """ mapped_address: int = address - ( CANOPEN_ADDRESS_OFFSET + (MAP_ADDRESS_OFFSET * (subnode - 1)) ) return mapped_address def _monitoring_disturbance_data_to_map_register( self, subnode: int, address: int, dtype: int, size: int ) -> int: """Arrange necessary data to map a monitoring/disturbance register. Args: subnode: Subnode to be targeted. address: Register address to map. dtype: Register data type. size: Size of data in bytes. """ ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode) mapped_address: int = super()._monitoring_disturbance_data_to_map_register( subnode, ipb_address, dtype, size ) return mapped_address def _get_emergency_description(self, error_code: int) -> Optional[str]: """Get the error description from the error code. Args: error_code: Error code received. Returns: The error description corresponding to the error code. """ error_description = None if self.dictionary.errors is not None: error_code &= 0xFFFF if error_code in self.dictionary.errors.errors: error_description = self.dictionary.errors.errors[error_code][-1] return error_description
[docs] def set_pdo_map_to_slave(self, rpdo_maps: List[RPDOMap], tpdo_maps: List[TPDOMap]) -> None: self._rpdo_maps.extend(rpdo_maps) self._tpdo_maps.extend(tpdo_maps) self.slave.config_func = self.map_pdos
[docs] def process_pdo_inputs(self) -> None: self._process_tpdo(self.__slave.input)
[docs] def generate_pdo_outputs(self) -> None: output = self._process_rpdo() if output is None: return self.__slave.output = self._process_rpdo()
[docs] def set_pdo_watchdog_time(self, timeout: float) -> None: """Set the process data watchdog time. Args: timeout: Time in seconds. """ self.slave.set_watchdog( self.ETHERCAT_PDO_WATCHDOG, self.SECONDS_TO_MS_CONVERSION_FACTOR * timeout )
@property def slave(self) -> "CdefSlave": """Ethercat slave""" return self.__slave