Coverage for apio / utils / serial_util.py: 82%
100 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 03:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 03:35 +0000
1"""Serial devices related utilities."""
3import re
4from typing import List
5from dataclasses import dataclass
6from serial.tools.list_ports import comports
7from serial.tools.list_ports_common import ListPortInfo
8from apio.common.apio_console import cout
9from apio.utils import util, usb_util
10from apio.apio_context import ApioContext
13@dataclass()
14class SerialDevice:
15 """A data class to hold the information of a single Serial device."""
17 # pylint: disable=too-many-instance-attributes
19 port: str
20 port_name: str
21 vendor_id: str
22 product_id: str
23 manufacturer: str
24 product: str
25 serial_number: str
26 device_type: str
27 location: str
29 def __post_init__(self):
30 """Check that vid, pid, has the format %04X."""
31 usb_util.check_usb_id_format(self.vendor_id)
32 usb_util.check_usb_id_format(self.product_id)
34 def summary(self) -> str:
35 """Returns a user friendly short description of this device."""
36 return (
37 f"[{self.port}] "
38 f"[{self.vendor_id}:{self.product_id}] "
39 f"[{self.manufacturer}] [{self.product}] "
40 f"[{self.serial_number}]"
41 )
44def scan_serial_devices(_: ApioContext) -> List[SerialDevice]:
45 """Scan the connected serial devices and return their information."""
47 # -- Initial empty device list
48 devices = []
50 # -- Use the serial.tools.list_ports module for reading the
51 # -- serial ports. More info:
52 # -- https://pyserial.readthedocs.io/en/latest/tools.html
53 list_port_info: List[ListPortInfo] = comports()
54 assert isinstance(list_port_info, list)
55 if list_port_info: 55 ↛ 59line 55 didn't jump to line 59 because the condition on line 55 was always true
56 assert isinstance(list_port_info[0], ListPortInfo)
58 # -- Collect the items that are USB serial ports.
59 for port in list_port_info:
61 # -- Dump for debugging.
62 if util.is_debug(1): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 cout("Raw serial port:")
64 cout(f" Device: {port.device}")
65 cout(f" Hwid: {port.hwid}")
66 cout(f" Interface: {port.interface}")
68 # # -- Skip if not a serial port.
69 if not port.device: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 continue
72 # -- Skip if not a USB device.
73 if not port.vid or not port.pid: 73 ↛ 81line 73 didn't jump to line 81 because the condition on line 73 was always true
74 continue
76 # -- If needed, append A or B to the serial numbers of a dual channel
77 # -- FTDI devices such as FT2232H.
78 # --
79 # -- Ideally these should be done in the comports() function of
80 # -- the pyserial library.
81 serial_number = port.serial_number or ""
82 if serial_number and port.device.endswith(f"-{serial_number}0"):
83 serial_number += "A"
84 elif serial_number and port.device.endswith(f"-{serial_number}1"):
85 serial_number += "B"
87 # -- Add device to list.
88 devices.append(
89 SerialDevice(
90 port=port.device,
91 port_name=port.name,
92 vendor_id=f"{port.vid:04X}",
93 product_id=f"{port.pid:04X}",
94 manufacturer=port.manufacturer,
95 product=port.product,
96 serial_number=serial_number,
97 device_type=usb_util.get_device_type(port.vid, port.pid),
98 location=port.location,
99 )
100 )
102 # -- Sort by port name, case insensitive.
103 devices = sorted(devices, key=lambda d: d.port.lower())
105 if util.is_debug(1): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 cout(f"Found {len(devices)} serial device:")
107 for device in devices:
108 cout(str(device))
110 # -- All done.
111 return devices
114@dataclass
115class SerialDeviceFilter:
116 """A class to filter a list of serial devices by attributes. We use the
117 Fluent Interface design pattern so we can assert that the values that
118 the caller passes as filters are not unintentionally None or empty
119 unintentionally."""
121 _vendor_id: str = None
122 _product_id: str = None
123 _product_regex: str = None
124 _serial_port: str = None
125 _serial_num: str = None
127 def summary(self) -> str:
128 """User friendly representation of the filter"""
129 terms = []
130 if self._vendor_id:
131 terms.append(f"VID={self._vendor_id}")
132 if self._product_id:
133 terms.append(f"PID={self._product_id}")
134 if self._product_regex:
135 terms.append(f'REGEX="{self._product_regex}"')
136 if self._serial_port:
137 terms.append(f"PORT={self._serial_port}")
138 if self._serial_num:
139 terms.append(f'S/N="{self._serial_num}"')
140 if terms:
141 return "[" + ", ".join(terms) + "]"
142 return "[all]"
144 def set_vendor_id(self, vendor_id: str) -> "SerialDeviceFilter":
145 """Pass only devices with given vendor id."""
146 usb_util.check_usb_id_format(vendor_id)
147 self._vendor_id = vendor_id
148 return self
150 def set_product_id(self, product_id: str) -> "SerialDeviceFilter":
151 """Pass only devices given product id."""
152 usb_util.check_usb_id_format(product_id)
153 self._product_id = product_id
154 return self
156 def set_product_regex(self, product_regex: str) -> "SerialDeviceFilter":
157 """Pass only devices whose product string match given regex."""
158 assert product_regex
159 self._product_regex = product_regex
160 return self
162 def set_port(self, serial_port: str) -> "SerialDeviceFilter":
163 """Pass only devices given product serial port.."""
164 assert serial_port
165 self._serial_port = serial_port
166 return self
168 def set_serial_num(self, serial_num: str) -> "SerialDeviceFilter":
169 """Pass only devices given product serial number.."""
170 assert serial_num
171 self._serial_num = serial_num
172 return self
174 def _eval(self, device: SerialDevice) -> bool:
175 """Test if the devices passes this field."""
176 if (self._vendor_id is not None) and (
177 self._vendor_id != device.vendor_id
178 ):
179 return False
181 if (self._product_id is not None) and (
182 self._product_id != device.product_id
183 ):
184 return False
186 if (self._product_regex is not None) and not re.search(
187 self._product_regex, device.product
188 ):
189 return False
191 # -- We allow matching by full port string or by port name.
192 if (self._serial_port is not None) and (
193 (
194 self._serial_port.lower()
195 not in [device.port.lower(), device.port_name.lower()]
196 )
197 ):
198 return False
200 if (self._serial_num is not None) and (
201 self._serial_num.lower() != device.serial_number.lower()
202 ):
203 return False
205 return True
207 def filter(self, devices: List[SerialDevice]):
208 """Return a copy of the list with items that are pass this filter.
209 Items order is preserved."""
210 result = [d for d in devices if self._eval(d)]
211 return result