import time
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Dict
import ingenialogger
try:
import pysoem
except ImportError as ex:
pysoem = None
pysoem_import_error = ex
if TYPE_CHECKING:
from pysoem import CdefSlave
from ingenialink.constants import CAN_MAX_WRITE_SIZE, CANOPEN_ADDRESS_OFFSET, MAP_ADDRESS_OFFSET
from ingenialink.ethercat.dictionary import EthercatDictionary
from ingenialink.ethercat.register import EthercatRegister
from ingenialink.exceptions import ILIOError, ILTimeoutError, ILError
from ingenialink.pdo import PDOServo, RPDOMap, TPDOMap
from ingenialink.register import REG_ACCESS, REG_DTYPE
logger = ingenialogger.get_logger(__name__)
[docs]class SDO_OPERATION_MSG(Enum):
"""Message for exceptions depending on the operation type."""
READ = "reading"
WRITE = "writing"
[docs]class EthercatServo(PDOServo):
"""Ethercat Servo instance.
Args:
slave: Slave to be connected.
slave_id: Slave ID.
dictionary_path: Path to the dictionary.
connection_timeout: Time in seconds of the connection timeout.
servo_status_listener: Toggle the listener of the servo for
its status, errors, faults, etc.
Raises:
ImportError: WinPcap is not installed
"""
DICTIONARY_CLASS = EthercatDictionary
MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE
NO_RESPONSE_WORKING_COUNTER = 0
TIMEOUT_WORKING_COUNTER = -5
NOFRAME_WORKING_COUNTER = -1
MONITORING_DATA = EthercatRegister(
identifier="MONITORING_DATA",
units="",
subnode=0,
idx=0x58B2,
subidx=0x01,
cyclic="CONFIG",
dtype=REG_DTYPE.U16,
access=REG_ACCESS.RO,
)
DIST_DATA = EthercatRegister(
identifier="DISTURBANCE_DATA",
units="",
subnode=0,
idx=0x58B4,
subidx=0x01,
cyclic="CONFIG",
dtype=REG_DTYPE.U16,
access=REG_ACCESS.WO,
)
RPDO_ASSIGN_REGISTER_SUB_IDX_0 = EthercatRegister(
identifier="RPDO_ASSIGN_REGISTER",
units="",
subnode=0,
idx=0x1C12,
subidx=0x00,
dtype=REG_DTYPE.S32,
access=REG_ACCESS.RW,
)
RPDO_ASSIGN_REGISTER_SUB_IDX_1 = EthercatRegister(
identifier="RPDO_ASSIGN_REGISTER",
units="",
subnode=0,
idx=0x1C12,
subidx=0x01,
dtype=REG_DTYPE.S32,
access=REG_ACCESS.RW,
)
RPDO_MAP_REGISTER_SUB_IDX_0 = [
EthercatRegister(
identifier="RPDO_MAP_REGISTER",
units="",
subnode=0,
idx=0x1600,
subidx=0x00,
dtype=REG_DTYPE.S32,
access=REG_ACCESS.RW,
)
]
RPDO_MAP_REGISTER_SUB_IDX_1 = [
EthercatRegister(
identifier="RPDO_MAP_REGISTER",
units="",
subnode=0,
idx=0x1600,
subidx=0x01,
dtype=REG_DTYPE.STR,
access=REG_ACCESS.RW,
)
]
TPDO_ASSIGN_REGISTER_SUB_IDX_0 = EthercatRegister(
identifier="TPDO_ASSIGN_REGISTER",
units="",
subnode=0,
idx=0x1C13,
subidx=0x00,
dtype=REG_DTYPE.S32,
access=REG_ACCESS.RW,
)
TPDO_ASSIGN_REGISTER_SUB_IDX_1 = EthercatRegister(
identifier="TPDO_ASSIGN_REGISTER",
units="",
subnode=0,
idx=0x1C13,
subidx=0x01,
dtype=REG_DTYPE.S32,
access=REG_ACCESS.RW,
)
TPDO_MAP_REGISTER_SUB_IDX_0 = [
EthercatRegister(
identifier="TPDO_MAP_REGISTER",
units="",
subnode=0,
idx=0x1A00,
subidx=0x00,
dtype=REG_DTYPE.S32,
access=REG_ACCESS.RW,
)
]
TPDO_MAP_REGISTER_SUB_IDX_1 = [
EthercatRegister(
identifier="TPDO_MAP_REGISTER",
units="",
subnode=0,
idx=0x1A00,
subidx=0x01,
dtype=REG_DTYPE.STR,
access=REG_ACCESS.RW,
)
]
def __init__(
self,
slave: "CdefSlave",
slave_id: int,
dictionary_path: str,
connection_timeout: float,
servo_status_listener: bool = False,
):
if not pysoem:
raise pysoem_import_error
self.__slave = slave
self.slave_id = slave_id
self._connection_timeout = connection_timeout
super(EthercatServo, self).__init__(slave_id, dictionary_path, servo_status_listener)
def _read_raw( # type: ignore [override]
self,
reg: EthercatRegister,
buffer_size: int = 0,
complete_access: bool = False,
start_time: Optional[float] = None,
) -> bytes:
retry = False
if start_time is None:
start_time = time.time()
self._lock.acquire()
try:
time.sleep(0.0001) # Unlock threads before SDO read
value: bytes = self.__slave.sdo_read(reg.idx, reg.subidx, buffer_size, complete_access)
except (
pysoem.SdoError,
pysoem.MailboxError,
pysoem.PacketError,
pysoem.WkcError,
ILIOError,
) as e:
self._handle_sdo_exception(reg, SDO_OPERATION_MSG.READ, e)
except pysoem.Emergency as e:
if time.time() > start_time + self._connection_timeout:
raise ILTimeoutError("Emergency messages could not be cleared.") from e
retry = True
finally:
self._lock.release()
if retry:
return self._read_raw(reg, buffer_size, complete_access, start_time)
return value
def _write_raw( # type: ignore [override]
self,
reg: EthercatRegister,
data: bytes,
complete_access: bool = False,
start_time: Optional[float] = None,
) -> None:
retry = False
if start_time is None:
start_time = time.time()
self._lock.acquire()
try:
time.sleep(0.0001) # Unlock threads before SDO write
self.__slave.sdo_write(reg.idx, reg.subidx, data, complete_access)
except (
pysoem.SdoError,
pysoem.MailboxError,
pysoem.PacketError,
pysoem.WkcError,
ILIOError,
) as e:
self._handle_sdo_exception(reg, SDO_OPERATION_MSG.WRITE, e)
except pysoem.Emergency as e:
if time.time() > start_time + self._connection_timeout:
raise ILTimeoutError("Emergency messages could not be cleared.") from e
retry = True
finally:
self._lock.release()
if retry:
return self._write_raw(reg, data, complete_access, start_time)
def _handle_sdo_exception(
self, reg: EthercatRegister, operation_msg: SDO_OPERATION_MSG, exception: Exception
) -> None:
"""
Handle the exceptions that occur when reading or writing SDOs.
Args:
reg: The register that was read or written.
operation_msg: Operation type message to be shown on the exception.
exception: The exception that occurred while reading or writing.
Raises:
ILIOError: If the register cannot be read or written.
ILIOError: If the slave fails to acknowledge the command.
ILIOError: If the working counter value is wrong.
ILTimeoutError: If the slave fails to respond within the connection
timeout period.
"""
if isinstance(
exception, (pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError, ILIOError)
):
raise ILIOError(
f"Error {operation_msg.value} {reg.identifier}. Reason: {exception}"
) from exception
elif isinstance(exception, pysoem.WkcError):
default_error_msg = f"Error {operation_msg.value} data"
wkc_exceptions: Dict[int, ILError] = {
self.NO_RESPONSE_WORKING_COUNTER: ILIOError(
f"{default_error_msg}: The working counter remained unchanged."
),
self.NOFRAME_WORKING_COUNTER: ILIOError(f"{default_error_msg}: No frame."),
self.TIMEOUT_WORKING_COUNTER: ILTimeoutError(
f"Timeout {operation_msg.value} data."
),
}
exc = wkc_exceptions.get(
exception.wkc,
ILIOError(f"{default_error_msg}. Wrong working counter: {exception.wkc}"),
)
raise exc from exception
def _monitoring_read_data(self) -> bytes: # type: ignore [override]
"""Read monitoring data frame."""
return self._read_raw(self.MONITORING_DATA, buffer_size=1024, complete_access=True)
def _disturbance_write_data(self, data: bytearray) -> None: # type: ignore [override]
"""Write disturbance data."""
return self._write_raw(self.DIST_DATA, bytes(data), complete_access=True)
@staticmethod
def __monitoring_disturbance_map_can_address(address: int, subnode: int) -> int:
"""Map CAN register address to IPB register address.
Args:
subnode: Subnode to be targeted.
address: Register address to map.
"""
mapped_address: int = address - (
CANOPEN_ADDRESS_OFFSET + (MAP_ADDRESS_OFFSET * (subnode - 1))
)
return mapped_address
def _monitoring_disturbance_data_to_map_register(
self, subnode: int, address: int, dtype: int, size: int
) -> int:
"""Arrange necessary data to map a monitoring/disturbance register.
Args:
subnode: Subnode to be targeted.
address: Register address to map.
dtype: Register data type.
size: Size of data in bytes.
"""
ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode)
mapped_address: int = super()._monitoring_disturbance_data_to_map_register(
subnode, ipb_address, dtype, size
)
return mapped_address
def _get_emergency_description(self, error_code: int) -> Optional[str]:
"""Get the error description from the error code.
Args:
error_code: Error code received.
Returns:
The error description corresponding to the error code.
"""
error_description = None
if self.dictionary.errors is not None:
error_code &= 0xFFFF
if error_code in self.dictionary.errors.errors:
error_description = self.dictionary.errors.errors[error_code][-1]
return error_description
[docs] def set_pdo_map_to_slave(self, rpdo_maps: List[RPDOMap], tpdo_maps: List[TPDOMap]) -> None:
self.reset_rpdo_mapping()
self.reset_tpdo_mapping()
self._rpdo_maps = rpdo_maps
self._tpdo_maps = tpdo_maps
self.slave.config_func = self.map_pdos
[docs] def generate_pdo_outputs(self) -> None:
output = self._process_rpdo()
if output is None:
return
self.__slave.output = self._process_rpdo()
@property
def slave(self) -> "CdefSlave":
"""Ethercat slave"""
return self.__slave