Source code for ingenialink.network

from abc import ABC, abstractmethod
from collections import OrderedDict
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, List, Union

import ingenialogger

from ingenialink.servo import Servo

logger = ingenialogger.get_logger(__name__)


[docs]class NET_PROT(Enum): """Network Protocol.""" EUSB = 0 MCB = 1 ETH = 2 ECAT = 3 CAN = 5
[docs]class NET_STATE(Enum): """Network State.""" CONNECTED = 0 DISCONNECTED = 1 FAULTY = 2
[docs]class NET_DEV_EVT(Enum): """Device Event.""" ADDED = 0 REMOVED = 1
[docs]class EEPROM_FILE_FORMAT(Enum): """EEPROM file format""" BINARY = 0 INTEL = 1
[docs]class NET_TRANS_PROT(Enum): """Transmission protocol.""" TCP = 1 UDP = 2
[docs]@dataclass class SlaveInfo: product_code: int revision_number: int
[docs]class Network(ABC): """Declaration of a general Network object.""" def __init__(self) -> None: self.servos: List[Any] = [] """List of the connected servos in the network.""" @abstractmethod def scan_slaves(self) -> List[int]: raise NotImplementedError @abstractmethod def scan_slaves_info(self) -> OrderedDict[int, SlaveInfo]: raise NotImplementedError @abstractmethod def connect_to_slave(self, *args: Any, **kwargs: Any) -> Servo: raise NotImplementedError @abstractmethod def disconnect_from_slave(self, servo: Servo) -> None: raise NotImplementedError @abstractmethod def load_firmware(self, *args: Any, **kwargs: Any) -> None: raise NotImplementedError @abstractmethod def subscribe_to_status( self, target: Union[int, str], callback: Callable[[NET_DEV_EVT], Any] ) -> None: raise NotImplementedError @abstractmethod def unsubscribe_from_status( self, target: Union[int, str], callback: Callable[[NET_DEV_EVT], Any] ) -> None: raise NotImplementedError @abstractmethod def start_status_listener(self, *args: Any, **kwargs: Any) -> None: raise NotImplementedError @abstractmethod def stop_status_listener(self, *args: Any, **kwargs: Any) -> None: raise NotImplementedError @property def protocol(self) -> NET_PROT: raise NotImplementedError