Coverage for apio/utils/usb_util.py: 66%
134 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:51 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:51 +0000
1"""USB devices related utilities."""
3import sys
4import re
5from glob import glob
6from typing import List, Optional, Any
7from dataclasses import dataclass
8import usb.core
9import usb.backend.libusb1
10from apio.common.apio_console import cout, cerror
11from apio.common.apio_styles import INFO
12from apio.utils import util
13from apio.apio_context import ApioContext
16# -- Mapping of (VID), and (VID:PID) to device type. This is presented to the
17# -- user as an information only. Add more as you like.
19_USB_TYPES = {
20 # -- FTDI
21 (0x0403): "FTDI",
22 (0x0403, 0x6001): "FT232R",
23 (0x0403, 0x6010): "FT2232H",
24 (0x0403, 0x6011): "FT4232H",
25 (0x0403, 0x6014): "FT232H",
26 (0x0403, 0x6017): "FT313H",
27 (0x0403, 0x8372): "FT245R",
28 (0x0403, 0x8371): "FT232BM",
29 (0x0403, 0x8373): "FT2232C",
30 (0x0403, 0x8374): "FT4232",
31}
34def get_device_type(vid: int, pid: int) -> str:
35 """Determine device type string. Try to match by (vid, pid) and if
36 not found, by (vid). Returns "" if not found."""
37 device_type = _USB_TYPES.get((vid, pid), "")
38 if not device_type:
39 device_type = _USB_TYPES.get((vid), "")
40 return device_type
43def check_usb_id_format(usb_id: str) -> None:
44 """Check that a vid or pid is in 4 char uppercase hex."""
45 if not re.search(r"^[0-9A-F]{4}$", usb_id): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 raise ValueError(f"Invalid 04X hex value: [{usb_id}]")
49@dataclass()
50class UsbDevice:
51 """A data class to hold the information of a single USB device."""
53 # pylint: disable=too-many-instance-attributes
55 vendor_id: str
56 product_id: str
57 bus: int
58 device: int
59 manufacturer: str
60 product: str
61 serial_number: str
62 device_type: str
64 def __post_init__(self):
65 """Check that vid, pid, has the format %04X."""
66 check_usb_id_format(self.vendor_id)
67 check_usb_id_format(self.product_id)
69 def summary(self) -> str:
70 """Returns a user friendly short description of this device."""
71 return (
72 f"[{self.vendor_id}:{self.product_id}] "
73 f"[{self.bus}:{self.device}] "
74 f"[{self.manufacturer}] "
75 f"[{self.product}] [{self.serial_number}]"
76 )
79def _get_usb_str(
80 device: usb.core.Device, index: int, default: str
81) -> Optional[str]:
82 """Extract usb string by its index."""
83 # pylint: disable=broad-exception-caught
84 try:
85 s = str(usb.util.get_string(device, index))
86 # For Tang 9K which contains a null char as a string separator.
87 # It's not USB standard but C tools do that implicitly.
88 s = s.split("\x00", 1)[0]
89 return s
90 except Exception as e:
91 if util.is_debug(1):
92 cout(f"Error getting USB string at index {index}: {e}")
93 return default
96def scan_usb_devices(apio_ctx: ApioContext) -> List[UsbDevice]:
97 """Query and return a list with usb device info."""
98 # pylint: disable=too-many-locals
100 # -- Track the names we searched for. For diagnostics.
101 searched_names = []
103 def find_library(name: str):
104 """A callback for looking up the libusb backend file."""
106 # -- Track searched names, for diagnostics
107 searched_names.append(name)
109 # -- Try to match to a lib in oss-cad-suite/lib.
110 oss_dir = apio_ctx.get_package_dir("oss-cad-suite")
111 pattern = oss_dir / "lib" / f"lib{name}*"
112 files = glob(str(pattern))
114 if util.is_debug(1): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 cout("Apio find_library() call:")
116 cout(f" {name=}")
117 cout(f" {pattern=}")
118 cout(f" {files=}")
120 # -- We don't expect multiple matches.
121 if len(files) > 1: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 cerror(f"Found multiple backends for '{name}': {files}")
123 sys.exit(1)
125 if files: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true
126 return files[0]
127 return None
129 # -- Lookup libusb backend library file in oss-cad-suite/lib.
130 backend = usb.backend.libusb1.get_backend(find_library=find_library)
132 if not backend: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 cerror("Libusb backend not found")
134 cout(f"Searched names: {searched_names}", style=INFO)
135 sys.exit(1)
137 # -- Find the usb devices.
138 raw_devices = usb.core.find(find_all=True, backend=backend)
139 devices: List[Any] = list(raw_devices) if raw_devices else []
141 # -- Collect the devices
142 result: List[UsbDevice] = []
143 for device in devices: 143 ↛ 145line 143 didn't jump to line 145 because the loop on line 143 never started
144 # -- Print entire raw device info for debugging.
145 if util.is_debug(1):
146 cout()
147 cout(str(device))
148 cout()
150 # -- Sanity check.
151 assert isinstance(device, usb.core.Device), type(device)
153 # -- Skip hubs, they are not interesting
154 d = device.bDeviceClass # pyright: ignore[reportAttributeAccessIssue]
155 if d == 0x09:
156 continue
158 # -- Lookup device type or "" if not found.
159 device_type = get_device_type(
160 device.idVendor, # pyright: ignore[reportAttributeAccessIssue]
161 device.idProduct, # pyright: ignore[reportAttributeAccessIssue]
162 ) # pyright: ignore[reportAttributeAccessIssue]
164 # -- Create the device object.
165 unavail = "--unavail--"
166 vid = device.idVendor # pyright: ignore[reportAttributeAccessIssue]
167 pid = device.idProduct # pyright: ignore[reportAttributeAccessIssue]
169 d = device
170 man = d.iManufacturer # pyright: ignore[reportAttributeAccessIssue]
171 iser = d.iSerialNumber # pyright: ignore[reportAttributeAccessIssue]
172 item = UsbDevice(
173 vendor_id=f"{vid:04X}",
174 product_id=f"{pid:04X}",
175 bus=device.bus, # pyright: ignore[reportArgumentType]
176 device=device.address or 0,
177 manufacturer=_get_usb_str(
178 device, # pyright: ignore[reportArgumentType]
179 man,
180 default=unavail,
181 ),
182 product=_get_usb_str(
183 device, # pyright: ignore[reportArgumentType]
184 device.iProduct, # pyright: ignore[reportAttributeAccessIssue]
185 default=unavail,
186 ),
187 serial_number=_get_usb_str(
188 device, # pyright: ignore[reportArgumentType]
189 iser, # pyright: ignore[reportAttributeAccessIssue]
190 default="",
191 ),
192 device_type=device_type,
193 )
194 result.append(item)
196 # -- Sort by (vendor, product, bus, device).
197 result = sorted(
198 result,
199 key=lambda d: (
200 d.vendor_id.lower(),
201 d.product_id.lower(),
202 d.bus,
203 d.device,
204 ),
205 )
207 if util.is_debug(1): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 cout(f"Found {len(result)} USB devices:")
209 for device in result:
210 cout(str(device))
212 # -- All done.
213 return result
216@dataclass
217class UsbDeviceFilter:
218 """A class to filter a list of usb devices by attributes. We use the
219 Fluent Interface design pattern so we can assert that the values that
220 the caller passes as filters are not unintentionally None or empty
221 unintentionally."""
223 _vendor_id: str | None = None
224 _product_id: str | None = None
225 product_regex: str | None = None
226 _serial_num: str | None = None
228 def summary(self) -> str:
229 """User friendly representation of the filter"""
230 terms = []
232 if self._vendor_id:
233 terms.append(f"VID={self._vendor_id}")
234 if self._product_id:
235 terms.append(f"PID={self._product_id}")
236 if self.product_regex:
237 terms.append(f'REGEX="{self.product_regex}"')
238 if self._serial_num:
239 terms.append(f'S/N="{self._serial_num}"')
240 if terms:
241 return "[" + ", ".join(terms) + "]"
242 return "[all]"
244 def set_vendor_id(self, vendor_id: str) -> "UsbDeviceFilter":
245 """Pass only devices with given vendor id."""
246 check_usb_id_format(vendor_id)
247 self._vendor_id = vendor_id
248 return self
250 def set_product_id(self, product_id: str) -> "UsbDeviceFilter":
251 """Pass only devices with given product id."""
252 check_usb_id_format(product_id)
253 self._product_id = product_id
254 return self
256 def set_product_regex(self, product_regex: str) -> "UsbDeviceFilter":
257 """Pass only devices whose product string match given regex."""
258 assert product_regex
259 self.product_regex = product_regex
260 return self
262 def set_serial_num(self, serial_num: str) -> "UsbDeviceFilter":
263 """Pass only devices given product serial number.."""
264 assert serial_num
265 self._serial_num = serial_num
266 return self
268 def _eval(self, device: UsbDevice) -> bool:
269 """Test if the devices passes this field."""
270 if (self._vendor_id is not None) and (
271 self._vendor_id != device.vendor_id
272 ):
273 return False
275 if (self._product_id is not None) and (
276 self._product_id != device.product_id
277 ):
278 return False
280 if (self.product_regex is not None) and not re.search(
281 self.product_regex, device.product
282 ):
283 return False
285 if (self._serial_num is not None) and (
286 self._serial_num.lower() != device.serial_number.lower()
287 ):
288 return False
290 return True
292 def filter(self, devices: List[UsbDevice]):
293 """Return a copy of the list with items that are pass this filter.
294 Items order is preserved."""
295 result = [d for d in devices if self._eval(d)]
296 return result