Source code for ingenialink.ethernet.network

import ftplib
import os
import socket
import time
from collections import OrderedDict, defaultdict
from ftplib import FTP
from threading import Thread
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Union

import ingenialogger

from ingenialink.constants import DEFAULT_ETH_CONNECTION_TIMEOUT
from ingenialink.exceptions import ILError, ILFirmwareLoadError
from ingenialink.network import NET_DEV_EVT, NET_STATE, Network, SlaveInfo
from ingenialink.utils.udp import UDP

from ..network import NET_PROT
from .servo import EthernetServo

logger = ingenialogger.get_logger(__name__)

FTP_SESSION_OK_CODE = "220"
FTP_LOGIN_OK_CODE = "230"
FTP_FILE_TRANSFER_OK_CODE = "226"
FTP_CLOSE_OK_CODE = "221"

CMD_CHANGE_CPU = 0x67E4

MAX_NUM_UNSUCCESSFUL_PINGS = 3


[docs] class NetStatusListener(Thread): """Network status listener thread to check if the drive is alive. Args: network: Network instance of the Ethernet communication. """ def __init__(self, network: "EthernetNetwork", refresh_time: float = 0.25) -> None: super(NetStatusListener, self).__init__() self.__network = network self.__refresh_time = refresh_time self.__stop = False self.__max_unsuccessful_pings = MAX_NUM_UNSUCCESSFUL_PINGS
[docs] def run(self) -> None: while not self.__stop: for servo in self.__network.servos: unsuccessful_pings = 0 servo_ip = servo.ip_address servo_state = self.__network.get_servo_state(servo_ip) while unsuccessful_pings < self.__max_unsuccessful_pings: response = servo.is_alive() # TODO: Use ping after CAP-924 is fixed if not response: unsuccessful_pings += 1 else: break ping_response = unsuccessful_pings != self.__max_unsuccessful_pings if servo_state == NET_STATE.CONNECTED and not ping_response: self.__network._notify_status(servo_ip, NET_DEV_EVT.REMOVED) self.__network._set_servo_state(servo_ip, NET_STATE.DISCONNECTED) if servo_state == NET_STATE.DISCONNECTED and ping_response: self.__network._notify_status(servo_ip, NET_DEV_EVT.ADDED) self.__network._set_servo_state(servo_ip, NET_STATE.CONNECTED) time.sleep(self.__refresh_time)
def stop(self) -> None: self.__stop = True
[docs] class EthernetNetwork(Network): """Network for all Ethernet communications.""" def __init__(self) -> None: super(EthernetNetwork, self).__init__() self.__listener_net_status: Optional[NetStatusListener] = None self.__observers_net_state: Dict[str, List[Callable[[NET_DEV_EVT], Any]]] = defaultdict( list )
[docs] @staticmethod def load_firmware( fw_file: str, target: str = "192.168.2.22", ftp_user: str = "", ftp_pwd: str = "" ) -> None: """Loads a given firmware file to the target slave. .. warning :: It is needed to disconnect the drive(:func:`disconnect_from_slave`) after loading the firmware since the `Servo` object's data will become obsolete. Args: fw_file: Path to the firmware file to be loaded. target: IP of the target slave. ftp_user: FTP user to connect with. ftp_pwd: FTP password for the given user. Raises: ILError: If the loading firmware process fails. """ if not os.path.isfile(fw_file): raise FileNotFoundError(f"Could not find {fw_file}.") # Start a FTP session. Drive must be in BOOT mode. logger.info("Starting FTP session...") with FTP() as ftp: try: ftp_output = ftp.connect(target) except ConnectionError as e: raise ILFirmwareLoadError("Unable to create the FTP session") from e logger.info(ftp_output) if FTP_SESSION_OK_CODE not in ftp_output: raise ILFirmwareLoadError("Unable to open the FTP session") # Login into FTP session. logger.info("Logging into FTP session...") ftp_output = ftp.login(ftp_user, ftp_pwd) logger.info(ftp_output) if FTP_LOGIN_OK_CODE not in ftp_output: raise ILFirmwareLoadError("Unable to login the FTP session") # Load file through FTP. logger.info("Uploading firmware file...") ftp.set_pasv(False) try: with open(fw_file, "rb") as file: ftp_output = ftp.storbinary(f"STOR {os.path.basename(file.name)}", file) except ftplib.error_temp as e: raise ILFirmwareLoadError("Unable to load the FW file through FTP.") from e logger.info(ftp_output) if FTP_FILE_TRANSFER_OK_CODE not in ftp_output: raise ILFirmwareLoadError("Unable to load the FW file through FTP") logger.info("FTP session closed.")
[docs] @staticmethod def load_firmware_moco(node: int, subnode: int, ip: str, port: int, moco_file: str) -> None: """Update MOCO firmware through UDP protocol. Args: node: Network node. subnode: Drive subnode. ip: Drive address IP. port: Drive port. moco_file: Path to the firmware file. Returns: Result code. Raises: ILFirmwareLoadError: The firmware load process fails with an error message. """ upd = UDP(port, ip) if not moco_file or not os.path.isfile(moco_file): raise ILFirmwareLoadError("File not found") moco_in = open(moco_file, "r") logger.info("Loading firmware...") try: for line in moco_in: words = line.split() # Get command and address cmd = int(words[1] + words[0], 16) data = b"" data_start_byte = 2 while data_start_byte in range(data_start_byte, len(words)): # Load UDP data data += bytes([int(words[data_start_byte], 16)]) data_start_byte += 1 # Send message upd.raw_cmd(node, subnode, cmd, data) if cmd == CMD_CHANGE_CPU: sleep(1) logger.info("Bootload process succeeded") except ftplib.error_temp as e: logger.error(e) raise ILFirmwareLoadError("Firewall might be blocking the access.") except Exception as e: logger.error(e) raise ILFirmwareLoadError("Error during bootloader process.")
def scan_slaves(self) -> List[int]: raise NotImplementedError def scan_slaves_info(self) -> OrderedDict[int, SlaveInfo]: raise NotImplementedError
[docs] def connect_to_slave( self, target: str, dictionary: str, port: int = 1061, connection_timeout: float = DEFAULT_ETH_CONNECTION_TIMEOUT, servo_status_listener: bool = False, net_status_listener: bool = False, is_eoe: bool = False, ) -> EthernetServo: """Connects to a slave through the given network settings. Args: target: IP of the target slave. dictionary: Path to the target dictionary file. port: Port to connect to the slave. connection_timeout: Time in seconds of the connection timeout. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. net_status_listener: Toggle the listener of the network status, connection and disconnection. is_eoe: True if communication is EoE. ``False`` by default. Returns: EthernetServo: Instance of the servo connected. """ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(connection_timeout) sock.connect((target, port)) servo = EthernetServo(sock, dictionary, servo_status_listener, is_eoe) try: servo.get_state() except ILError as e: servo.stop_status_listener() raise ILError(f"Drive not found in IP {target}.") from e self.servos.append(servo) self._set_servo_state(target, NET_STATE.CONNECTED) if net_status_listener: self.start_status_listener() else: self.stop_status_listener() return servo
[docs] def disconnect_from_slave(self, servo: EthernetServo) -> None: # type: ignore [override] """Disconnects the slave from the network. Args: servo: Instance of the servo connected. """ self.servos.remove(servo) servo.stop_status_listener() self.close_socket(servo.socket) self._set_servo_state(servo.ip_address, NET_STATE.DISCONNECTED) if len(self.servos) == 0: self.stop_status_listener()
[docs] @staticmethod def close_socket(sock: socket.socket) -> None: """Closes the established network socket.""" sock.shutdown(socket.SHUT_RDWR) sock.close()
[docs] def start_status_listener(self) -> None: """Start monitoring network events (CONNECTION/DISCONNECTION).""" if self.__listener_net_status is None: listener = NetStatusListener(self) listener.start() self.__listener_net_status = listener
[docs] def stop_status_listener(self) -> None: """Stops the NetStatusListener from listening to the drive.""" if self.__listener_net_status is not None: self.__listener_net_status.stop() self.__listener_net_status.join() self.__listener_net_status = None
def _notify_status(self, ip: str, status: NET_DEV_EVT) -> None: """Notify subscribers of a network state change.""" for callback in self.__observers_net_state[ip]: callback(status)
[docs] def subscribe_to_status(self, ip: str, callback: Callable[[NET_DEV_EVT], Any]) -> None: # type: ignore [override] """Subscribe to network state changes. Args: ip: IP of the drive to subscribe. callback: Callback function. """ if callback in self.__observers_net_state[ip]: logger.info("Callback already subscribed.") return self.__observers_net_state[ip].append(callback)
[docs] def unsubscribe_from_status(self, ip: str, callback: Callable[[NET_DEV_EVT], Any]) -> None: # type: ignore [override] """Unsubscribe from network state changes. Args: ip: IP of the drive to unsubscribe. callback: Callback function. """ if callback not in self.__observers_net_state[ip]: logger.info("Callback not subscribed.") return self.__observers_net_state[ip].remove(callback)
[docs] def get_servo_state(self, servo_id: Union[int, str]) -> NET_STATE: """Get the state of a servo that's a part of network. The state indicates if the servo is connected or disconnected. Args: servo_id: The servo's IP address. Returns: The servo's state. """ if not isinstance(servo_id, str): raise ValueError("The servo ID must be a string.") return self._servos_state[servo_id]
def _set_servo_state(self, servo_id: Union[int, str], state: NET_STATE) -> None: """Set the state of a servo that's a part of network. Args: servo_id: The servo's IP address. state: The servo's state. """ self._servos_state[servo_id] = state @property def protocol(self) -> NET_PROT: """Obtain network protocol.""" return NET_PROT.ETH