Coverage for apio/utils/serial_util.py: 88%
94 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-06 10:20 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-06 10:20 +0000
1"""Serial devices related utilities."""
3import re
4from typing import List
5from dataclasses import dataclass
6from serial.tools.list_ports import comports
7from serial.tools.list_ports_common import ListPortInfo
8from apio.common.apio_console import cout
9from apio.utils import util, usb_util
12@dataclass()
13class SerialDevice:
14 """A data class to hold the information of a single Serial device."""
16 # pylint: disable=too-many-instance-attributes
18 port: str
19 port_name: str
20 vendor_id: str
21 product_id: str
22 manufacturer: str
23 product: str
24 serial_number: str
25 device_type: str
26 location: str
28 def __post_init__(self):
29 """Check that vid, pid, has the format %04X."""
30 usb_util.check_usb_id_format(self.vendor_id)
31 usb_util.check_usb_id_format(self.product_id)
33 def summary(self) -> str:
34 """Returns a user friendly short description of this device."""
35 return (
36 f"[{self.port}] "
37 f"[{self.vendor_id}:{self.product_id}] "
38 f"[{self.manufacturer}] [{self.product}] "
39 f"[{self.serial_number}]"
40 )
43def scan_serial_devices() -> List[SerialDevice]:
44 """Scan the connected serial devices and return their information."""
46 # -- Initial empty device list
47 devices = []
49 # -- Use the serial.tools.list_ports module for reading the
50 # -- serial ports. More info:
51 # -- https://pyserial.readthedocs.io/en/latest/tools.html
52 list_port_info: List[ListPortInfo] = comports()
53 assert isinstance(list_port_info, list)
54 if list_port_info: 54 ↛ 58line 54 didn't jump to line 58 because the condition on line 54 was always true
55 assert isinstance(list_port_info[0], ListPortInfo)
57 # -- Collect the items that are USB serial ports.
58 for port in list_port_info:
60 # -- Dump for debugging.
61 if util.is_debug(1): 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 cout("Raw serial port:")
63 cout(f" Device: {port.device}")
64 cout(f" Hwid: {port.hwid}")
65 cout(f" Interface: {port.interface}")
67 # # -- Skip if not a serial port.
68 if not port.device: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 continue
71 # -- Skip if not a USB device.
72 if not port.vid or not port.pid: 72 ↛ 76line 72 didn't jump to line 76 because the condition on line 72 was always true
73 continue
75 # -- Add device to list.
76 devices.append(
77 SerialDevice(
78 port=port.device,
79 port_name=port.name,
80 vendor_id=f"{port.vid:04X}",
81 product_id=f"{port.pid:04X}",
82 manufacturer=port.manufacturer,
83 product=port.product,
84 serial_number=port.serial_number or "",
85 device_type=usb_util.get_device_type(port.vid, port.pid),
86 location=port.location,
87 )
88 )
90 # -- Sort by port name, case insensitive.
91 devices = sorted(devices, key=lambda d: d.port.lower())
93 if util.is_debug(1): 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 cout(f"Found {len(devices)} serial device:")
95 for device in devices:
96 cout(str(device))
98 # -- All done.
99 return devices
102@dataclass
103class SerialDeviceFilter:
104 """A class to filter a list of serial devices by attributes. We use the
105 Fluent Interface design pattern so we can assert that the values that
106 the caller passes as filters are not unintentionally None or empty
107 unintentionally."""
109 _vendor_id: str = None
110 _product_id: str = None
111 _product_regex: str = None
112 _serial_port: str = None
113 _serial_num: str = None
115 def summary(self) -> str:
116 """User friendly representation of the filter"""
117 terms = []
118 if self._vendor_id:
119 terms.append(f"VID={self._vendor_id}")
120 if self._product_id:
121 terms.append(f"PID={self._product_id}")
122 if self._product_regex:
123 terms.append(f'REGEX="{self._product_regex}"')
124 if self._serial_port:
125 terms.append(f"PORT={self._serial_port}")
126 if self._serial_num:
127 terms.append(f'S/N="{self._serial_num}"')
128 if terms:
129 return "[" + ", ".join(terms) + "]"
130 return "[all]"
132 def set_vendor_id(self, vendor_id: str) -> "SerialDeviceFilter":
133 """Pass only devices with given vendor id."""
134 usb_util.check_usb_id_format(vendor_id)
135 self._vendor_id = vendor_id
136 return self
138 def set_product_id(self, product_id: str) -> "SerialDeviceFilter":
139 """Pass only devices given product id."""
140 usb_util.check_usb_id_format(product_id)
141 self._product_id = product_id
142 return self
144 def set_product_regex(self, product_regex: str) -> "SerialDeviceFilter":
145 """Pass only devices whose product string match given regex."""
146 assert product_regex
147 self._product_regex = product_regex
148 return self
150 def set_port(self, serial_port: str) -> "SerialDeviceFilter":
151 """Pass only devices given product serial port.."""
152 assert serial_port
153 self._serial_port = serial_port
154 return self
156 def set_serial_num(self, serial_num: str) -> "SerialDeviceFilter":
157 """Pass only devices given product serial number.."""
158 assert serial_num
159 self._serial_num = serial_num
160 return self
162 def _eval(self, device: SerialDevice) -> bool:
163 """Test if the devices passes this field."""
164 if (self._vendor_id is not None) and (
165 self._vendor_id != device.vendor_id
166 ):
167 return False
169 if (self._product_id is not None) and (
170 self._product_id != device.product_id
171 ):
172 return False
174 if (self._product_regex is not None) and not re.search(
175 self._product_regex, device.product
176 ):
177 return False
179 # -- We allow matching by full port string or by port name.
180 if (self._serial_port is not None) and (
181 (
182 self._serial_port.lower()
183 not in [device.port.lower(), device.port_name.lower()]
184 )
185 ):
186 return False
188 if (self._serial_num is not None) and (
189 self._serial_num.lower() != device.serial_number.lower()
190 ):
191 return False
193 return True
195 def filter(self, devices: List[SerialDevice]):
196 """Return a copy of the list with items that are pass this filter.
197 Items order is preserved."""
198 result = [d for d in devices if self._eval(d)]
199 return result