from abc import ABC, abstractmethod
from collections import OrderedDict
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, List, Union
import ingenialogger
from ingenialink.servo import Servo
logger = ingenialogger.get_logger(__name__)
[docs]class NET_PROT(Enum):
"""Network Protocol."""
EUSB = 0
MCB = 1
ETH = 2
ECAT = 3
CAN = 5
[docs]class NET_STATE(Enum):
"""Network State."""
CONNECTED = 0
DISCONNECTED = 1
FAULTY = 2
[docs]class NET_DEV_EVT(Enum):
"""Device Event."""
ADDED = 0
REMOVED = 1
[docs]class NET_TRANS_PROT(Enum):
"""Transmission protocol."""
TCP = 1
UDP = 2
[docs]@dataclass
class SlaveInfo:
product_code: int
revision_number: int
[docs]class Network(ABC):
"""Declaration of a general Network object."""
def __init__(self) -> None:
self.servos: List[Any] = []
"""List of the connected servos in the network."""
@abstractmethod
def scan_slaves(self) -> List[int]:
raise NotImplementedError
@abstractmethod
def scan_slaves_info(self) -> OrderedDict[int, SlaveInfo]:
raise NotImplementedError
@abstractmethod
def connect_to_slave(self, *args: Any, **kwargs: Any) -> Servo:
raise NotImplementedError
@abstractmethod
def disconnect_from_slave(self, servo: Servo) -> None:
raise NotImplementedError
@abstractmethod
def load_firmware(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
@abstractmethod
def subscribe_to_status(
self, target: Union[int, str], callback: Callable[[NET_DEV_EVT], Any]
) -> None:
raise NotImplementedError
@abstractmethod
def unsubscribe_from_status(
self, target: Union[int, str], callback: Callable[[NET_DEV_EVT], Any]
) -> None:
raise NotImplementedError
@abstractmethod
def start_status_listener(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
@abstractmethod
def stop_status_listener(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
@property
def protocol(self) -> NET_PROT:
raise NotImplementedError