from ..network import NET_PROT
from .servo import EthercatServo
from ingenialink.ipb.network import IPBNetwork
from ingenialink.utils._utils import cstr, raise_err
from ..exceptions import *
from .._ingenialink import lib, ffi
[docs]class EthercatNetwork(IPBNetwork):
"""Network for all EtherCAT communications.
Args:
interface_name (str): Interface name to be targeted.
"""
def __init__(self, interface_name=""):
super(EthercatNetwork, self).__init__()
self._cffi_network = ffi.new('il_net_t **')
self.interface_name = interface_name
"""str: Interface name used in the network settings."""
[docs] def load_firmware(self, fw_file, target=1, boot_in_app=True):
"""Loads a given firmware file to a target.
.. warning::
Choose the ``boot_in_app`` flag accordingly to your
servo specifications otherwise the servo could enter
a blocking state.
Args:
target (int): Targeted node ID to be loaded.
fw_file (str): Path to the firmware file.
boot_in_app (bool): If summit series -> True.
If capitan series -> False.
If custom device -> Contact manufacturer.
Raises:
ILFirmwareLoadError: The firmware load process fails
with an error message.
"""
try:
_interface_name = cstr(self.interface_name) \
if self.interface_name else ffi.NULL
_fw_file = cstr(fw_file) if fw_file else ffi.NULL
r = lib.il_net_update_firmware(self._cffi_network,
_interface_name,
target,
_fw_file,
boot_in_app)
if r < 0:
raise ILFirmwareLoadError('Error updating firmware. '
'Error code: {}'.format(r))
except Exception as e:
raise ILFirmwareLoadError(e)
[docs] def scan_slaves(self):
"""Scan all the slaves connected in the network.
Returns:
list: List of number of slaves connected to the network.
"""
_interface_name = cstr(self.interface_name) \
if self.interface_name else ffi.NULL
number_slaves = lib.il_net_num_slaves_get(_interface_name)
slaves = []
for slave in range(number_slaves):
slaves.append(slave + 1)
return slaves
[docs] def connect_to_slave(self, target=1, dictionary="", use_eoe_comms=1):
"""Connect a slave through an EtherCAT connection.
Args:
target (int): Number of the target slave.
dictionary (str): Path to the dictionary to be loaded.
use_eoe_comms (int): Specify which architecture is the target based on.
Returns:
EthercatServo: Instance of the connected servo.
"""
servo = None
_interface_name = cstr(self.interface_name) \
if self.interface_name else ffi.NULL
_dictionary = cstr(dictionary) if dictionary else ffi.NULL
_servo = ffi.new('il_servo_t **')
r = lib.il_servo_connect_ecat(3, _interface_name, self._cffi_network,
_servo, _dictionary, 1061,
target, use_eoe_comms)
if r <= 0:
_servo = None
self._cffi_network = None
raise_err(r)
else:
net_ = ffi.cast('il_net_t *', self._cffi_network[0])
servo_ = ffi.cast('il_servo_t *', _servo[0])
servo = EthercatServo(servo_, net_, target, dictionary)
self._cffi_network = net_
self.servos.append(servo)
return servo
[docs] def disconnect_from_slave(self, servo):
"""Disconnects the slave from the network.
Args:
servo (EthernetServo): Instance of the servo connected.
"""
# TODO: This stops all connections no only the target servo.
if servo in self.servos:
self.servos.remove(servo)
r = lib.il_net_master_stop(self._cffi_network)
self.destroy_network()
self._cffi_network = None
if r < 0:
raise ILError('Error disconnecting the drive. '
'Return code: {}'.format(r))
@property
def protocol(self):
"""NET_PROT: Obtain network protocol."""
return NET_PROT.ECAT