Source code for ingenialink.serial.network

from ingenialink.network import NET_PROT
from ingenialink.ipb.network import IPBNetwork
from .._ingenialink import lib, ffi
from ingenialink.utils._utils import pstr, cstr, raise_null, to_ms
from ingenialink.serial.servo import SerialServo


[docs]class SerialNetwork(IPBNetwork): """Network for all Serial communications. Args: timeout_rd (int): Timeout for read commands. timeout_wr (int): Timeout for write commands. """ def __init__(self, timeout_rd=0.5, timeout_wr=0.5): super(SerialNetwork, self).__init__() self.__timeout_rd = timeout_rd self.__timeout_wr = timeout_wr self.__net_interface = None def load_firmware(self, fw_file): raise NotImplementedError
[docs] def scan_slaves(self): """Obtain a list of network devices. Returns: list: List of network devices. Raises: TypeError: If the protocol type is invalid. """ devs = lib.il_net_dev_list_get(NET_PROT.MCB.value) found = [] curr = devs while curr: found.append(pstr(curr.port)) curr = curr.next lib.il_net_dev_list_destroy(devs) return found
[docs] def connect_to_slave(self, target=None, dictionary=""): """Connects to a slave through the given network settings. Args: target (str): IP of the target slave. dictionary (str): Path to the target dictionary file. Returns: SerialServo: Instance of the servo connected. """ self._on_found = ffi.NULL callback = ffi.NULL handle = ffi.NULL opts = ffi.new('il_net_opts_t *') _port = ffi.new('char []', cstr(target)) opts.port = _port opts.timeout_rd = to_ms(self.__timeout_rd) opts.timeout_wr = to_ms(self.__timeout_wr) self.__net_interface = lib.il_net_create(NET_PROT.MCB.value, opts) raise_null(self.__net_interface) servos = lib.il_net_servos_list_get(self.__net_interface, callback, handle) found = [] curr = servos while curr: found.append(int(curr.id)) curr = curr.next lib.il_net_servos_list_destroy(servos) servo = None if len(found) > 0: servo = SerialServo(self.__net_interface, target=found[0], dictionary_path=dictionary) self._cffi_network = self.__net_interface self.servos.append(servo) return servo
[docs] def disconnect_from_slave(self, servo): """Disconnects the slave from the network. Args: servo (SerialServo): Instance of the servo connected. """ self.servos.remove(servo) if len(self.servos) == 0: lib.il_net_disconnect(self._cffi_network) self.destroy_network() self._cffi_network = None
@property def protocol(self): """NET_PROT: Obtain network protocol.""" return NET_PROT.MCB @property def timeout_rd(self): """int: Read timeout""" return self.__timeout_rd @timeout_rd.setter def timeout_rd(self, value): self.__timeout_rd = value @property def timeout_wr(self): """int: Write timeout""" return self.__timeout_wr @timeout_wr.setter def timeout_wr(self, value): self.__timeout_wr = value