import ipaddress
import socket
from enum import Enum
import ingenialogger
from ingenialink import constants
from ingenialink.ethernet.network import EthernetNetwork
from ingenialink.exceptions import ILTimeoutError, ILIOError, ILError
logger = ingenialogger.get_logger(__name__)
[docs]class EoECommand(Enum):
INIT = 0
DEINIT = 1
SCAN = 2
CONFIG = 3
ERASE_CONFIG = 4
EOE_START = 5
EOE_STOP = 6
GET_STATUS = 7
[docs]class EoENetwork(EthernetNetwork):
"""Network for EoE (Ethernet over EtherCAT) communication.
Args:
ifname (str): Network interface name.
connection_timeout (float): Time in seconds of the connection timeout
to the EoE service.
"""
EOE_MSG_CMD_SIZE = 2
EOE_MSG_NODE_SIZE = 2
EOE_MSG_IP_SIZE = 4
EOE_MSG_DATA_SIZE = 53
EOE_MSG_FRAME_SIZE = EOE_MSG_CMD_SIZE + EOE_MSG_DATA_SIZE
NULL_TERMINATOR = b"\x00"
STATUS_EOE_BIT = 0b10
STATUS_INIT_BIT = 0b1
ECAT_SERVICE_NETWORK = ipaddress.ip_network("192.168.3.0/24")
def __init__(self, ifname, connection_timeout=constants.DEFAULT_ETH_CONNECTION_TIMEOUT):
super().__init__()
self.ifname = ifname
self._eoe_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._eoe_socket.settimeout(connection_timeout)
self._connect_to_eoe_service()
status = self._get_status_eoe_service()
if status & self.STATUS_EOE_BIT:
self._stop_eoe_service()
self._erase_config_eoe_service()
if status & self.STATUS_INIT_BIT:
self._deinitialize_eoe_service()
self._eoe_service_init = False
self._eoe_service_started = False
self._configured_slaves = {}
[docs] def connect_to_slave(
self,
slave_id,
ip_address,
dictionary=None,
port=1061,
connection_timeout=constants.DEFAULT_ETH_CONNECTION_TIMEOUT,
servo_status_listener=False,
net_status_listener=False,
):
"""Connects to a slave through the given network settings.
Args:
slave_id (int): EtherCAT slave ID.
ip_address (str): IP address to be assigned to the slave.
dictionary (str): Path to the target dictionary file.
port (int): Port to connect to the slave.
connection_timeout (float): Time in seconds of the connection timeout.
servo_status_listener (bool): Toggle the listener of the servo for
its status, errors, faults, etc.
net_status_listener (bool): Toggle the listener of the network
status, connection and disconnection.
Raises:
ValueError: ip_address must be a subnetwork of 192.168.3.0/24
ILError: If the EoE service is not running.
ILError: If the EoE service cannot be started on the network interface.
Returns:
EthernetServo: Instance of the servo connected.
"""
if ipaddress.ip_address(ip_address) not in self.ECAT_SERVICE_NETWORK:
raise ValueError("ip_address must be a subnetwork of 192.168.3.0/24")
if not self._eoe_service_init:
self._initialize_eoe_service()
if self._eoe_service_started:
self._stop_eoe_service()
self._erase_config_eoe_service()
self.__reconfigure_drives()
try:
self._configure_slave(slave_id, ip_address)
finally:
self._start_eoe_service()
self._configured_slaves[ip_address] = slave_id
return super().connect_to_slave(
ip_address,
dictionary,
port,
connection_timeout,
servo_status_listener,
net_status_listener,
)
def __reconfigure_drives(self):
"""Reconfigure all the slaves saved in the network"""
for ip_addr, slave_id in self._configured_slaves.items():
try:
self._configure_slave(slave_id, ip_addr)
except ILError as e:
logger.error(e)
[docs] def disconnect_from_slave(self, servo):
del self._configured_slaves[servo.ip_address]
super().disconnect_from_slave(servo)
if len(self.servos) == 0:
self._stop_eoe_service()
self._erase_config_eoe_service()
self._deinitialize_eoe_service()
def __del__(self):
self._eoe_socket.shutdown(socket.SHUT_RDWR)
self._eoe_socket.close()
[docs] def scan_slaves(self):
"""Scan slaves connected to the network adapter.
Returns:
list: List containing the ids of the connected slaves.
Raises:
ILError: If the EoE service fails to perform a scan.
"""
deinit_later = False
if not self._eoe_service_init:
deinit_later = True
self._initialize_eoe_service()
result = self._scan_eoe_service()
if deinit_later:
self._deinitialize_eoe_service()
return result
def _scan_eoe_service(self):
"""Make the scan request to the EoE service
Returns:
list: List containing the ids of the connected slaves.
Raises:
ILError: If the EoE service fails to perform a scan.
"""
msg = self._build_eoe_command_msg(EoECommand.SCAN.value)
try:
r = self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError(
"Failed to perform a network scan. Please verify the EoE service is running."
) from e
return list(range(1, r + 1))
@staticmethod
def _build_eoe_command_msg(cmd, data=None):
"""
Build a message with the following format.
+----------+----------+
| cmd | data |
+==========+==========+
| 2 Byte | 53 Bytes |
+----------+----------+
Args:
cmd (int): Indicates which operation to perform.
data (bytes): Contains the necessary data to perform the desired command.
Returns:
bytes: The message to send.
"""
if data is None:
data = bytes()
cmd_field = cmd.to_bytes(EoENetwork.EOE_MSG_CMD_SIZE, "little")
data_field = data + EoENetwork.NULL_TERMINATOR * (EoENetwork.EOE_MSG_DATA_SIZE - len(data))
return cmd_field + data_field
def _send_command(self, msg):
"""
Send command to EoE service.
Args:
msg (bytes): Message to send.
Returns:
int: Response from the EoE service.
Raises:
ILTimeoutError: Timeout while receiving a response from
the EoE service.
ILIOError: Error while sending/receiving message.
"""
try:
self._eoe_socket.send(msg)
except socket.error as e:
raise ILIOError("Error sending message.") from e
try:
response = self._eoe_socket.recv(1024)
except socket.timeout as e:
raise ILTimeoutError("Timeout while receiving response.") from e
except socket.error as e:
raise ILIOError("Error receiving response.") from e
return int.from_bytes(response, byteorder="little", signed=True)
def _connect_to_eoe_service(self):
"""Connect to the EoE service."""
self._eoe_socket.connect(("127.0.0.1", 8888))
def _initialize_eoe_service(self):
"""Initialize the virtual network interface and the packet forwarder.
Raises:
ILError: If the EoE service is not running.
ILError: If the EoE service cannot be started on the network interface.
"""
self._eoe_service_init = True
data = self.ifname
msg = self._build_eoe_command_msg(EoECommand.INIT.value, data=data.encode("utf-8"))
try:
r = self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError(
"Failed to initialize the EoE service. Please verify it's running."
) from e
if r < 0:
raise ILError(f"Failed to initialize the EoE service using interface {self.ifname}.")
def _deinitialize_eoe_service(self):
"""Deinitialize the virtual network interface and the packet forwarder.
Raises:
ILError: If the EoE service is not running.
ILError: If the EoE service cannot be stopped on the network interface.
"""
self._eoe_service_init = False
data = self.ifname
msg = self._build_eoe_command_msg(EoECommand.DEINIT.value, data=data.encode("utf-8"))
try:
self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError("Failed to deinitialize the EoE service.") from e
def _configure_slave(self, slave_id, ip_address, net_mask="255.255.255.0"):
"""
Configure an EtherCAT slave with a given IP.
Args:
slave_id (int): EtherCAT slave ID.
ip_address (str): IP address to be set to the slave.
Raises:
ILError: If the EoE service fails to configure a slave.
"""
slave_bytes = slave_id.to_bytes(self.EOE_MSG_NODE_SIZE, "little")
ip_int = int(ipaddress.IPv4Address(ip_address))
ip_bytes = ip_int.to_bytes(self.EOE_MSG_IP_SIZE, "little")
net_mask_int = int(ipaddress.IPv4Address(net_mask))
net_mask_bytes = net_mask_int.to_bytes(self.EOE_MSG_IP_SIZE, "little")
data = slave_bytes + ip_bytes + net_mask_bytes
msg = self._build_eoe_command_msg(EoECommand.CONFIG.value, data)
try:
self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError(f"Failed to configure slave {slave_id} with IP {ip_address}.") from e
def _start_eoe_service(self):
"""Starts the EoE service
Raises:
ILError: If the EoE service fails to start.
"""
self._eoe_service_started = True
msg = self._build_eoe_command_msg(EoECommand.EOE_START.value)
try:
self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError("Failed to start the EoE service.") from e
def _stop_eoe_service(self):
"""Stops the EoE service
Raises:
ILError: If the EoE service fails to stop.
"""
self._eoe_service_started = False
msg = self._build_eoe_command_msg(EoECommand.EOE_STOP.value)
try:
self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError("Failed to stop the EoE service.") from e
def _erase_config_eoe_service(self):
"""Stops the EoE service
Raises:
ILError: If the EoE service fails to stop.
"""
msg = self._build_eoe_command_msg(EoECommand.ERASE_CONFIG.value)
try:
self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError("Failed to stop the EoE service.") from e
def _get_status_eoe_service(self):
"""Get the EoE service status.
+-----------+------+------+
|ECAT status| init | eoe |
+===========+======+======+
| 1Byte | 1bit | 1bit |
+-----------+------+------+
Returns:
int: get status response
Raises:
ILError: If get status request fails.
"""
msg = self._build_eoe_command_msg(EoECommand.GET_STATUS.value)
try:
r = self._send_command(msg)
except (ILIOError, ILTimeoutError) as e:
raise ILError("Failed to get service status.") from e
return r
def load_firmware_moco(self):
raise NotImplementedError
def load_firmware(self):
raise NotImplementedError