import os
import time
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, List, Optional
import ingenialogger
from ingenialink import Servo
from ingenialink.emcy import EmergencyMessage
try:
import pysoem
except ImportError as ex:
pysoem = None
pysoem_import_error = ex
if TYPE_CHECKING:
from pysoem import CdefSlave
from ingenialink.constants import CAN_MAX_WRITE_SIZE, CANOPEN_ADDRESS_OFFSET, MAP_ADDRESS_OFFSET
from ingenialink.dictionary import Interface
from ingenialink.ethercat.register import EthercatRegister
from ingenialink.exceptions import ILIOError
from ingenialink.pdo import PDOServo, RPDOMap, TPDOMap
logger = ingenialogger.get_logger(__name__)
[docs]
class SDO_OPERATION_MSG(Enum):
"""Message for exceptions depending on the operation type."""
READ = "reading"
WRITE = "writing"
[docs]
class EthercatEmergencyMessage(EmergencyMessage):
"""Ethercat emergency message class.
Args:
servo: The servo that generated the emergency error.
emergency_msg: The emergency message instance from PySOEM.
"""
def __init__(self, servo: Servo, emergency_msg: "pysoem.Emergency"):
data = (
emergency_msg.b1.to_bytes(1, "little")
+ emergency_msg.w1.to_bytes(2, "little")
+ emergency_msg.w2.to_bytes(2, "little")
)
super().__init__(servo, emergency_msg.error_code, emergency_msg.error_reg, data)
[docs]
class EthercatServo(PDOServo):
"""Ethercat Servo instance.
Args:
slave: Slave to be connected.
slave_id: Slave ID.
dictionary_path: Path to the dictionary.
connection_timeout: Time in seconds of the connection timeout.
servo_status_listener: Toggle the listener of the servo for
its status, errors, faults, etc.
Raises:
ImportError: WinPcap is not installed
"""
MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE
MONITORING_DATA_BUFFER_SIZE = 1024
NO_RESPONSE_WORKING_COUNTER = 0
TIMEOUT_WORKING_COUNTER = -5
NOFRAME_WORKING_COUNTER = -1
ETHERCAT_PDO_WATCHDOG = "processdata"
SECONDS_TO_MS_CONVERSION_FACTOR = 1000
interface = Interface.ECAT
DEFAULT_STORE_RECOVERY_TIMEOUT = 1
DEFAULT_EEPROM_OPERATION_TIMEOUT_uS = 200_000
DEFAULT_EEPROM_READ_BYTES_LENGTH = 2
def __init__(
self,
slave: "CdefSlave",
slave_id: int,
dictionary_path: str,
connection_timeout: float,
servo_status_listener: bool = False,
):
if not pysoem:
raise pysoem_import_error
self.__slave = slave
self.slave_id = slave_id
self._connection_timeout = connection_timeout
self.__emcy_observers: List[Callable[[EmergencyMessage], None]] = []
self.__slave.add_emergency_callback(self._on_emcy)
super(EthercatServo, self).__init__(slave_id, dictionary_path, servo_status_listener)
[docs]
def store_parameters(
self,
subnode: Optional[int] = None,
timeout: Optional[float] = DEFAULT_STORE_RECOVERY_TIMEOUT,
) -> None:
"""Store all the current parameters of the target subnode.
Args:
subnode: Subnode of the axis. `None` by default which stores
all the parameters.
timeout : how many seconds to wait for the drive to become responsive
after the store operation. If ``None`` it will wait forever.
Raises:
ILError: Invalid subnode.
"""
super().store_parameters(subnode)
self._wait_until_alive(timeout)
[docs]
def restore_parameters(
self,
subnode: Optional[int] = None,
timeout: Optional[float] = DEFAULT_STORE_RECOVERY_TIMEOUT,
) -> None:
"""Restore all the current parameters of all the slave to default.
.. note::
The drive needs a power cycle after this
in order for the changes to be properly applied.
Args:
subnode: Subnode of the axis. `None` by default which restores
all the parameters.
timeout : how many seconds to wait for the drive to become responsive
after the restore operation. If ``None`` it will wait forever.
Raises:
ILError: Invalid subnode.
"""
super().restore_parameters(subnode)
self._wait_until_alive(timeout)
def _wait_until_alive(self, timeout: Optional[float]) -> None:
"""Wait until the drive becomes responsive.
Args:
timeout : how many seconds to wait for the drive to become responsive.
If ``None`` it will wait forever.
"""
init_time = time.time()
while not self.is_alive():
if timeout is not None and (init_time + timeout) < time.time():
logger.info("The drive is unresponsive after the recovery timeout.")
break
def _read_raw( # type: ignore [override]
self,
reg: EthercatRegister,
buffer_size: int = 0,
complete_access: bool = False,
start_time: Optional[float] = None,
) -> bytes:
self._lock.acquire()
try:
time.sleep(0.0001) # Unlock threads before SDO read
value: bytes = self.__slave.sdo_read(reg.idx, reg.subidx, buffer_size, complete_access)
except (
pysoem.SdoError,
pysoem.MailboxError,
pysoem.PacketError,
pysoem.WkcError,
ILIOError,
) as e:
self._handle_sdo_exception(reg, SDO_OPERATION_MSG.READ, e)
finally:
self._lock.release()
return value
def _write_raw( # type: ignore [override]
self,
reg: EthercatRegister,
data: bytes,
complete_access: bool = False,
start_time: Optional[float] = None,
) -> None:
self._lock.acquire()
try:
time.sleep(0.0001) # Unlock threads before SDO write
self.__slave.sdo_write(reg.idx, reg.subidx, data, complete_access)
except (
pysoem.SdoError,
pysoem.MailboxError,
pysoem.PacketError,
pysoem.WkcError,
ILIOError,
) as e:
self._handle_sdo_exception(reg, SDO_OPERATION_MSG.WRITE, e)
finally:
self._lock.release()
def _handle_sdo_exception(
self, reg: EthercatRegister, operation_msg: SDO_OPERATION_MSG, exception: Exception
) -> None:
"""
Handle the exceptions that occur when reading or writing SDOs.
Args:
reg: The register that was read or written.
operation_msg: Operation type message to be shown on the exception.
exception: The exception that occurred while reading or writing.
Raises:
ILIOError: If the register cannot be read or written.
ILIOError: If the slave fails to acknowledge the command.
ILIOError: If the working counter value is wrong.
ILTimeoutError: If the slave fails to respond within the connection
timeout period.
"""
default_error_msg = f"Error {operation_msg.value} {reg.identifier}"
if isinstance(exception, pysoem.WkcError):
wkc_errors = {
self.NO_RESPONSE_WORKING_COUNTER: "The working counter remained unchanged.",
self.NOFRAME_WORKING_COUNTER: "No frame.",
self.TIMEOUT_WORKING_COUNTER: "Timeout.",
}
reason = wkc_errors[exception.wkc]
elif isinstance(exception, (pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError)):
reason = f"{type(exception).__name__}: Slave {exception.slave_pos}, "
if isinstance(exception, pysoem.SdoError):
reason += f"Abort code {exception.abort_code}, "
else:
reason += f"Error code {exception.error_code}, "
error_description = f"Error description: {exception.desc}."
reason += error_description
else:
reason = str(exception)
raise ILIOError(f"{default_error_msg}. {reason}") from exception
def _monitoring_read_data(self, **kwargs: Any) -> bytes:
"""Read monitoring data frame."""
return super()._monitoring_read_data(
buffer_size=self.MONITORING_DATA_BUFFER_SIZE, complete_access=True
)
def _disturbance_write_data(self, data: bytes, **kwargs: Any) -> None:
"""Write disturbance data."""
super()._disturbance_write_data(data, complete_access=True)
@staticmethod
def __monitoring_disturbance_map_can_address(address: int, subnode: int) -> int:
"""Map CAN register address to IPB register address.
Args:
subnode: Subnode to be targeted.
address: Register address to map.
"""
mapped_address: int = address - (
CANOPEN_ADDRESS_OFFSET + (MAP_ADDRESS_OFFSET * (subnode - 1))
)
return mapped_address
def _monitoring_disturbance_data_to_map_register(
self, subnode: int, address: int, dtype: int, size: int
) -> int:
"""Arrange necessary data to map a monitoring/disturbance register.
Args:
subnode: Subnode to be targeted.
address: Register address to map.
dtype: Register data type.
size: Size of data in bytes.
"""
ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode)
mapped_address: int = super()._monitoring_disturbance_data_to_map_register(
subnode, ipb_address, dtype, size
)
return mapped_address
[docs]
def emcy_subscribe(self, callback: Callable[[EmergencyMessage], None]) -> None:
"""Subscribe to emergency messages.
Args:
callback: Callable that takes a EmergencyMessage instance as argument.
"""
self.__emcy_observers.append(callback)
[docs]
def emcy_unsubscribe(self, callback: Callable[[EmergencyMessage], None]) -> None:
"""Unsubscribe from emergency messages.
Args:
callback: Subscribed callback.
"""
self.__emcy_observers.remove(callback)
def _on_emcy(self, emergency_msg: "pysoem.Emergency") -> None:
"""Receive an emergency message from PySOEM and transform it to a EthercatEmergencyMessage.
Afterward, send the EthercatEmergencyMessage to all the subscribed callbacks.
Args:
emergency_msg: The pysoem.Emergency instance.
"""
emergency_message = EthercatEmergencyMessage(self, emergency_msg)
logger.warning(f"Emergency message received from slave {self.target}: {emergency_message}")
for callback in self.__emcy_observers:
callback(emergency_message)
[docs]
def set_pdo_map_to_slave(self, rpdo_maps: List[RPDOMap], tpdo_maps: List[TPDOMap]) -> None:
for rpdo_map in rpdo_maps:
if rpdo_map not in self._rpdo_maps:
self._rpdo_maps.append(rpdo_map)
for tpdo_map in tpdo_maps:
if tpdo_map not in self._tpdo_maps:
self._tpdo_maps.append(tpdo_map)
self.slave.config_func = self.map_pdos
[docs]
def generate_pdo_outputs(self) -> None:
output = self._process_rpdo()
if output is None:
return
self.__slave.output = self._process_rpdo()
[docs]
def set_pdo_watchdog_time(self, timeout: float) -> None:
"""Set the process data watchdog time.
Args:
timeout: Time in seconds.
"""
self.slave.set_watchdog(
self.ETHERCAT_PDO_WATCHDOG, self.SECONDS_TO_MS_CONVERSION_FACTOR * timeout
)
def _read_esc_eeprom(
self,
address: int,
length: int = DEFAULT_EEPROM_READ_BYTES_LENGTH,
timeout: int = DEFAULT_EEPROM_OPERATION_TIMEOUT_uS,
) -> bytes:
"""Read from the ESC EEPROM.
Args:
address: EEPROM address to be read.
length: Length of data to be read. By default, 2 bytes are read.
timeout: Operation timeout (microseconds). By default, 200.000 us.
Returns:
EEPROM data. The read data.
Raises:
ValueError: If the length to be read has an invalid value.
"""
if length < 1:
raise ValueError("The minimum length is 1 byte.")
data = bytes()
while len(data) < length:
data += self.slave.eeprom_read(address, timeout)
address += 2
if len(data) > length:
data = data[:length]
return data
def _write_esc_eeprom(
self, address: int, data: bytes, timeout: int = DEFAULT_EEPROM_OPERATION_TIMEOUT_uS
) -> None:
"""Write to the ESC EEPROM.
Args:
address: EEPROM address to be written.
data: Data to be written. The data length must be a multiple of 2 bytes.
timeout: Operation timeout (microseconds). By default, 200.000 us.
Raises:
ValueError: If the data has the wrong size.
"""
if len(data) % 2 != 0:
raise ValueError("The data length must be a multiple of 2 bytes.")
start_address = address
while data:
self.slave.eeprom_write(start_address, data[:2], timeout)
data = data[2:]
start_address += 1
def _write_esc_eeprom_from_file(self, file_path: str) -> None:
"""Load a binary file to the ESC EEPROM.
Args:
file_path: Path to the binary file to be loaded.
Raises:
FileNotFoundError: If the binary file cannot be found.
"""
if not os.path.isfile(file_path):
raise FileNotFoundError(f"Could not find {file_path}.")
with open(file_path, "rb") as file:
data = file.read()
self._write_esc_eeprom(address=0, data=data)
@property
def slave(self) -> "CdefSlave":
"""Ethercat slave"""
return self.__slave