Source code for ingenialink.ethercat.network

import inspect
import os
import platform
import subprocess
import sys
import time
from collections import OrderedDict, defaultdict
from enum import Enum
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import ingenialogger

try:
    import pysoem
except ImportError as ex:
    pysoem = None
    pysoem_import_error = ex

if TYPE_CHECKING:
    from pysoem import CdefSlave

from ingenialink import bin as bin_module
from ingenialink.ethercat.servo import EthercatServo
from ingenialink.exceptions import ILError, ILFirmwareLoadError, ILStateError, ILWrongWorkingCount
from ingenialink.network import NET_DEV_EVT, NET_PROT, NET_STATE, Network, SlaveInfo

logger = ingenialogger.get_logger(__name__)


[docs]class SlaveState(Enum): INIT_STATE = 1 NONE_STATE = 0 OP_STATE = 8 PREOP_STATE = 2 SAFEOP_STATE = 4 STATE_ERROR = 16 SAFEOP_ERROR_STATE = SAFEOP_STATE + STATE_ERROR
[docs]class NetStatusListener(Thread): """Network status listener thread to check if the drive is alive. Args: network: Network instance of the EtherCAT communication. """ def __init__(self, network: "EthercatNetwork", refresh_time: float = 0.25): super(NetStatusListener, self).__init__() self.__network = network self.__refresh_time = refresh_time self.__stop = False self._ecat_master = self.__network._ecat_master
[docs] def run(self) -> None: while not self.__stop: self._ecat_master.read_state() for servo in self.__network.servos: slave_id = servo.slave_id servo_state = self.__network._get_servo_state(slave_id) slave_is_alive = servo.slave.state == pysoem.NONE_STATE if servo_state == NET_STATE.CONNECTED and slave_is_alive: self.__network._notify_status(slave_id, NET_DEV_EVT.REMOVED) self.__network._set_servo_state(slave_id, NET_STATE.DISCONNECTED) if servo_state == NET_STATE.DISCONNECTED and not slave_is_alive: self.__network._notify_status(slave_id, NET_DEV_EVT.ADDED) self.__network._set_servo_state(slave_id, NET_STATE.CONNECTED) time.sleep(self.__refresh_time)
def stop(self) -> None: self.__stop = True
[docs]class EthercatNetwork(Network): """Network for all EtherCAT communications. Args: interface_name: Interface name to be targeted. connection_timeout: Time in seconds of the connection timeout. overlapping_io_map: Map PDOs to overlapping IO map. Raises: ImportError: WinPcap is not installed """ FOE_APPLICATION = { "win32": {"64bit": "FoE/win_64x/FoEUpdateFirmware.exe"}, "linux": {"64bit": "FoE/linux/FoEUpdateFirmware"}, } FOE_ERRORS = { 1: "Can’t read the input file.", 2: "ECAT slave can’t reach the BOOT mode.", 3: "No ECAT slave detected", 4: "Can’t initialize the network adapter", 5: "Drive can't init. Ensure the FW file is right", } UNKNOWN_FOE_ERROR = "Unknown error" MANUAL_STATE_CHANGE = 1 DEFAULT_ECAT_CONNECTION_TIMEOUT_S = 1 ECAT_STATE_CHANGE_TIMEOUT_NS = 50_000 ECAT_PROCESSDATA_TIMEOUT_S = 0.1 def __init__( self, interface_name: str, connection_timeout: float = DEFAULT_ECAT_CONNECTION_TIMEOUT_S, overlapping_io_map: bool = True, ): if not pysoem: raise pysoem_import_error super(EthercatNetwork, self).__init__() self.interface_name: str = interface_name self.servos: List[EthercatServo] = [] self.__servos_state: Dict[int, NET_STATE] = {} self.__listener_net_status: Optional[NetStatusListener] = None self.__observers_net_state: Dict[int, List[Any]] = defaultdict(list) self._connection_timeout: float = connection_timeout self._ecat_master: pysoem.CdefMaster = pysoem.Master() self._ecat_master.sdo_read_timeout = int(1_000_000 * self._connection_timeout) self._ecat_master.sdo_write_timeout = int(1_000_000 * self._connection_timeout) self._ecat_master.manual_state_change = self.MANUAL_STATE_CHANGE self._overlapping_io_map = overlapping_io_map self.__is_master_running = False self.__last_init_nodes: List[int] = []
[docs] def scan_slaves(self) -> List[int]: """Scans for slaves in the network. Scanning of slaves cannot be done if a slave is already connected to the network. Returns: List containing all the detected slaves. Raises: ILError: If any slaves is already connected. """ if self.servos: raise ILError("Some slaves are already connected") if not self.__is_master_running: self._start_master() self.__init_nodes() return self.__last_init_nodes
[docs] def scan_slaves_info(self) -> OrderedDict[int, SlaveInfo]: """Scans for slaves in the network and return an ordered dict with the slave information. Returns: Ordered dict with the slave information. Raises: ILError: If any slave is already connected. """ slave_info: OrderedDict[int, SlaveInfo] = OrderedDict() try: slaves = self.scan_slaves() except ILError: return slave_info for slave_id in slaves: slave = self._ecat_master.slaves[slave_id - 1] slave_info[slave_id] = SlaveInfo(slave.id, slave.rev) return slave_info
def __init_nodes(self) -> None: """Init all the nodes and set already connected nodes to PreOp state. Also fill `__last_init_nodes` attribute. """ self._ecat_master.slaves = [] # That is because of a bug of pysoem nodes = self._ecat_master.config_init() if self.servos: self._change_nodes_state(self.servos, pysoem.PREOP_STATE) self.__last_init_nodes = list(range(1, nodes + 1))
[docs] def connect_to_slave( # type: ignore [override] self, slave_id: int, dictionary: str, servo_status_listener: bool = False, net_status_listener: bool = False, ) -> EthercatServo: """Connects to a drive through a given slave number. Args: slave_id: Targeted slave to be connected. dictionary: Path to the dictionary file. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. net_status_listener: Toggle the listener of the network status, connection and disconnection. Raises: ValueError: If the slave ID is not valid. ILError: If no slaves are found. ILStateError: If slave can not reach PreOp state """ if not isinstance(slave_id, int) or slave_id < 0: raise ValueError("Invalid slave ID value") if not self.__is_master_running: self._start_master() if slave_id not in self.__last_init_nodes: self.__init_nodes() if len(self.__last_init_nodes) == 0: raise ILError("Could not find any slaves in the network.") if slave_id not in self.__last_init_nodes: raise ILError(f"Slave {slave_id} was not found.") slave = self._ecat_master.slaves[slave_id - 1] servo = EthercatServo( slave, slave_id, dictionary, self._connection_timeout, servo_status_listener ) if not self._change_nodes_state(servo, pysoem.PREOP_STATE): raise ILStateError("Slave can not reach PreOp state") servo.reset_pdo_mapping() self.servos.append(servo) self._set_servo_state(slave_id, NET_STATE.CONNECTED) self.subscribe_to_status(slave_id, self._recover_from_power_cycle) if net_status_listener: self.start_status_listener() return servo
[docs] def disconnect_from_slave(self, servo: EthercatServo) -> None: # type: ignore [override] """Disconnects the slave from the network. Args: servo: Instance of the servo connected. """ servo.stop_status_listener() if not self._change_nodes_state(servo, pysoem.INIT_STATE): logger.warning("Drive can not reach Init state") self.servos.remove(servo) if not self.servos: self.stop_status_listener() self._ecat_master.close() self.__is_master_running = False self.__last_init_nodes = []
[docs] def config_pdo_maps(self) -> None: """Configure the PDO maps. It maps the PDO maps of each slave and sets its state to SafeOP. """ if self._overlapping_io_map: self._ecat_master.config_overlap_map() else: self._ecat_master.config_map()
[docs] def start_pdos(self, timeout: float = 1.0) -> None: """Set all slaves with mapped PDOs to Operational State. Args: timeout: timeout in seconds to reach Op state, 1.0 seconds by default. Raises: ILStateError: If slaves can not reach SafeOp or Op state """ op_servo_list = [servo for servo in self.servos if servo._rpdo_maps or servo._tpdo_maps] if not op_servo_list: logger.warning("There are no PDOs assigned to any connected slave.") return try: for servo in op_servo_list: for rpdo_map in servo._rpdo_maps: rpdo_map.get_item_bytes() except ILError as e: raise ILError( "The RPDO values should be set before starting the PDO exchange process." ) from e self.config_pdo_maps() self._ecat_master.state = pysoem.SAFEOP_STATE if not self._change_nodes_state(op_servo_list, pysoem.SAFEOP_STATE): raise ILStateError("Drives can not reach SafeOp state") self._change_nodes_state(op_servo_list, pysoem.OP_STATE) init_time = time.time() while not self._check_node_state(op_servo_list, pysoem.OP_STATE): self.send_receive_processdata() if timeout < time.time() - init_time: raise ILStateError("Drives can not reach Op state")
[docs] def stop_pdos(self) -> None: """For all slaves in OP or SafeOp state, set state to PreOp""" self._ecat_master.read_state() op_servo_list = [ servo for servo in self.servos if servo.slave.state in [pysoem.OP_STATE, pysoem.SAFEOP_STATE] ] if not self._change_nodes_state(op_servo_list, pysoem.INIT_STATE): logger.warning("Not all drives could reach the Init state") self.__init_nodes()
[docs] def send_receive_processdata(self, timeout: float = ECAT_PROCESSDATA_TIMEOUT_S) -> None: """Send and receive PDOs Args: timeout: receive processdata timeout in seconds, 0.1 seconds by default. Raises: ILWrongWorkingCount: If processdata working count is wrong """ for servo in self.servos: servo.process_pdo_inputs() if self._overlapping_io_map: self._ecat_master.send_overlap_processdata() else: self._ecat_master.send_processdata() processdata_wkc = self._ecat_master.receive_processdata(timeout=int(timeout * 1_000_000)) if processdata_wkc != self._ecat_master.expected_wkc: self._ecat_master.read_state() servos_state_msg = "" for servo in self.servos: servos_state_msg += ( f"Slave {servo.slave_id}: state {SlaveState(servo.slave.state).name}" ) if servo.slave.al_status != 0: al_status = pysoem.al_status_code_to_string(servo.slave.al_status) servos_state_msg += f", AL status {al_status}." else: servos_state_msg += ". " raise ILWrongWorkingCount( f"Processdata working count is wrong, expected: {self._ecat_master.expected_wkc}," f" real: {processdata_wkc}. {servos_state_msg}" ) for servo in self.servos: servo.generate_pdo_outputs()
def _change_nodes_state( self, nodes: Union["EthercatServo", List["EthercatServo"]], target_state: int ) -> bool: """Set ECAT state to target state for all nodes in list Args: nodes: target node or list of nodes target_state: target ECAT state Returns: True if all nodes reached the target state, else False. """ node_list = nodes if isinstance(nodes, list) else [nodes] for drive in node_list: drive.slave.state = target_state drive.slave.write_state() return self._check_node_state(nodes, target_state) def _check_node_state( self, nodes: Union["EthercatServo", List["EthercatServo"]], target_state: int ) -> bool: """Check ECAT state for all nodes in list Args: nodes: target node or list of nodes target_state: target ECAT state Returns: True if all nodes reached the target state, else False. """ node_list = nodes if isinstance(nodes, list) else [nodes] self._ecat_master.read_state() return all( target_state == drive.slave.state_check(target_state, self.ECAT_STATE_CHANGE_TIMEOUT_NS) for drive in node_list )
[docs] def subscribe_to_status( # type: ignore [override] self, slave_id: int, callback: Callable[[NET_DEV_EVT], None] ) -> None: """Subscribe to network state changes. Args: slave_id: Slave ID of the drive to subscribe. callback: Callback function. """ if callback in self.__observers_net_state[slave_id]: logger.info("Callback already subscribed.") return self.__observers_net_state[slave_id].append(callback)
[docs] def unsubscribe_from_status( # type: ignore [override] self, slave_id: int, callback: Callable[[str, NET_DEV_EVT], None] ) -> None: """Unsubscribe from network state changes. Args: slave_id: Slave ID of the drive to subscribe. callback: Callback function. """ if callback not in self.__observers_net_state[slave_id]: logger.info("Callback not subscribed.") return self.__observers_net_state[slave_id].remove(callback)
[docs] def start_status_listener(self) -> None: # type: ignore [override] """Start monitoring network events (CONNECTION/DISCONNECTION).""" if self.__listener_net_status is None: listener = NetStatusListener(self) listener.start() self.__listener_net_status = listener
[docs] def stop_status_listener(self) -> None: # type: ignore [override] """Stops the NetStatusListener from listening to the drive.""" if self.__listener_net_status is not None: self.__listener_net_status.stop() self.__listener_net_status.join() self.__listener_net_status = None
[docs] def load_firmware(self, fw_file: str, slave_id: int = 1) -> None: # type: ignore [override] """Loads a given firmware file to a target slave. Args: fw_file: Path to the firmware file. slave_id: Slave ID to which load the firmware file. Raises: FileNotFoundError: If the firmware file cannot be found. ILFirmwareLoadError: If no slave is detected. ILFirmwareLoadError: If the FoE write operation is not successful. NotImplementedError: If FoE is not implemented for the current OS and architecture """ if not os.path.isfile(fw_file): raise FileNotFoundError(f"Could not find {fw_file}.") arch = platform.architecture()[0] sys_name = sys.platform app_path = self.FOE_APPLICATION.get(sys_name, {}).get(arch, None) if app_path is None: raise NotImplementedError( "Load FW by ECAT is not implemented for this OS and architecture:" f" {sys_name} {arch}" ) exec_path = os.path.join(os.path.dirname(inspect.getfile(bin_module)), app_path) logger.debug(f"Call FoE application for {sys_name}-{arch}") if sys_name == "linux": try: subprocess.run( f"chmod 777 {exec_path}", check=True, shell=True, encoding="utf-8", ) except subprocess.CalledProcessError as e: raise ILFirmwareLoadError("Could not change the FoE binary permissions.") from e try: subprocess.run( f"{exec_path} {self.interface_name} {slave_id} {fw_file}", check=True, shell=True, encoding="utf-8", ) except subprocess.CalledProcessError as e: foe_return_error = self.FOE_ERRORS.get(e.returncode, self.UNKNOWN_FOE_ERROR) raise ILFirmwareLoadError( f"The firmware file could not be loaded correctly. {foe_return_error}" ) from e logger.info("Firmware updated successfully")
def _start_master(self) -> None: """Start the EtherCAT master""" self._ecat_master.open(self.interface_name) self.__is_master_running = True @property def protocol(self) -> NET_PROT: """NET_PROT: Obtain network protocol.""" return NET_PROT.ECAT def _get_servo_state(self, slave_id: int) -> NET_STATE: return self.__servos_state[slave_id] def _set_servo_state(self, slave_id: int, state: NET_STATE) -> None: self.__servos_state[slave_id] = state def _notify_status(self, slave_id: int, status: NET_DEV_EVT) -> None: """Notify subscribers of a network state change.""" for callback in self.__observers_net_state[slave_id]: callback(status) def _recover_from_power_cycle(self, status: NET_DEV_EVT) -> None: """Transition the slaves to Pre-Op state after a power cycle. Args: status: The network status. Raises: ILStateError: If not all slaves reach PreOp state. """ self._ecat_master.read_state() if status == NET_DEV_EVT.ADDED and self._ecat_master.state == pysoem.INIT_STATE: self.__init_nodes() if not self._check_node_state(self.servos, pysoem.PREOP_STATE): raise ILStateError( "The communication cannot be recovered. Not all slaves reached PreOp state" )