Coverage for apio/utils/usb_util.py: 66%

134 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:51 +0000

1"""USB devices related utilities.""" 

2 

3import sys 

4import re 

5from glob import glob 

6from typing import List, Optional, Any 

7from dataclasses import dataclass 

8import usb.core 

9import usb.backend.libusb1 

10from apio.common.apio_console import cout, cerror 

11from apio.common.apio_styles import INFO 

12from apio.utils import util 

13from apio.apio_context import ApioContext 

14 

15 

16# -- Mapping of (VID), and (VID:PID) to device type. This is presented to the 

17# -- user as an information only. Add more as you like. 

18 

19_USB_TYPES = { 

20 # -- FTDI 

21 (0x0403): "FTDI", 

22 (0x0403, 0x6001): "FT232R", 

23 (0x0403, 0x6010): "FT2232H", 

24 (0x0403, 0x6011): "FT4232H", 

25 (0x0403, 0x6014): "FT232H", 

26 (0x0403, 0x6017): "FT313H", 

27 (0x0403, 0x8372): "FT245R", 

28 (0x0403, 0x8371): "FT232BM", 

29 (0x0403, 0x8373): "FT2232C", 

30 (0x0403, 0x8374): "FT4232", 

31} 

32 

33 

34def get_device_type(vid: int, pid: int) -> str: 

35 """Determine device type string. Try to match by (vid, pid) and if 

36 not found, by (vid). Returns "" if not found.""" 

37 device_type = _USB_TYPES.get((vid, pid), "") 

38 if not device_type: 

39 device_type = _USB_TYPES.get((vid), "") 

40 return device_type 

41 

42 

43def check_usb_id_format(usb_id: str) -> None: 

44 """Check that a vid or pid is in 4 char uppercase hex.""" 

45 if not re.search(r"^[0-9A-F]{4}$", usb_id): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 raise ValueError(f"Invalid 04X hex value: [{usb_id}]") 

47 

48 

49@dataclass() 

50class UsbDevice: 

51 """A data class to hold the information of a single USB device.""" 

52 

53 # pylint: disable=too-many-instance-attributes 

54 

55 vendor_id: str 

56 product_id: str 

57 bus: int 

58 device: int 

59 manufacturer: str 

60 product: str 

61 serial_number: str 

62 device_type: str 

63 

64 def __post_init__(self): 

65 """Check that vid, pid, has the format %04X.""" 

66 check_usb_id_format(self.vendor_id) 

67 check_usb_id_format(self.product_id) 

68 

69 def summary(self) -> str: 

70 """Returns a user friendly short description of this device.""" 

71 return ( 

72 f"[{self.vendor_id}:{self.product_id}] " 

73 f"[{self.bus}:{self.device}] " 

74 f"[{self.manufacturer}] " 

75 f"[{self.product}] [{self.serial_number}]" 

76 ) 

77 

78 

79def _get_usb_str( 

80 device: usb.core.Device, index: int, default: str 

81) -> Optional[str]: 

82 """Extract usb string by its index.""" 

83 # pylint: disable=broad-exception-caught 

84 try: 

85 s = str(usb.util.get_string(device, index)) 

86 # For Tang 9K which contains a null char as a string separator. 

87 # It's not USB standard but C tools do that implicitly. 

88 s = s.split("\x00", 1)[0] 

89 return s 

90 except Exception as e: 

91 if util.is_debug(1): 

92 cout(f"Error getting USB string at index {index}: {e}") 

93 return default 

94 

95 

96def scan_usb_devices(apio_ctx: ApioContext) -> List[UsbDevice]: 

97 """Query and return a list with usb device info.""" 

98 # pylint: disable=too-many-locals 

99 

100 # -- Track the names we searched for. For diagnostics. 

101 searched_names = [] 

102 

103 def find_library(name: str): 

104 """A callback for looking up the libusb backend file.""" 

105 

106 # -- Track searched names, for diagnostics 

107 searched_names.append(name) 

108 

109 # -- Try to match to a lib in oss-cad-suite/lib. 

110 oss_dir = apio_ctx.get_package_dir("oss-cad-suite") 

111 pattern = oss_dir / "lib" / f"lib{name}*" 

112 files = glob(str(pattern)) 

113 

114 if util.is_debug(1): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 cout("Apio find_library() call:") 

116 cout(f" {name=}") 

117 cout(f" {pattern=}") 

118 cout(f" {files=}") 

119 

120 # -- We don't expect multiple matches. 

121 if len(files) > 1: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 cerror(f"Found multiple backends for '{name}': {files}") 

123 sys.exit(1) 

124 

125 if files: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true

126 return files[0] 

127 return None 

128 

129 # -- Lookup libusb backend library file in oss-cad-suite/lib. 

130 backend = usb.backend.libusb1.get_backend(find_library=find_library) 

131 

132 if not backend: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 cerror("Libusb backend not found") 

134 cout(f"Searched names: {searched_names}", style=INFO) 

135 sys.exit(1) 

136 

137 # -- Find the usb devices. 

138 raw_devices = usb.core.find(find_all=True, backend=backend) 

139 devices: List[Any] = list(raw_devices) if raw_devices else [] 

140 

141 # -- Collect the devices 

142 result: List[UsbDevice] = [] 

143 for device in devices: 143 ↛ 145line 143 didn't jump to line 145 because the loop on line 143 never started

144 # -- Print entire raw device info for debugging. 

145 if util.is_debug(1): 

146 cout() 

147 cout(str(device)) 

148 cout() 

149 

150 # -- Sanity check. 

151 assert isinstance(device, usb.core.Device), type(device) 

152 

153 # -- Skip hubs, they are not interesting 

154 d = device.bDeviceClass # pyright: ignore[reportAttributeAccessIssue] 

155 if d == 0x09: 

156 continue 

157 

158 # -- Lookup device type or "" if not found. 

159 device_type = get_device_type( 

160 device.idVendor, # pyright: ignore[reportAttributeAccessIssue] 

161 device.idProduct, # pyright: ignore[reportAttributeAccessIssue] 

162 ) # pyright: ignore[reportAttributeAccessIssue] 

163 

164 # -- Create the device object. 

165 unavail = "--unavail--" 

166 vid = device.idVendor # pyright: ignore[reportAttributeAccessIssue] 

167 pid = device.idProduct # pyright: ignore[reportAttributeAccessIssue] 

168 

169 d = device 

170 man = d.iManufacturer # pyright: ignore[reportAttributeAccessIssue] 

171 iser = d.iSerialNumber # pyright: ignore[reportAttributeAccessIssue] 

172 item = UsbDevice( 

173 vendor_id=f"{vid:04X}", 

174 product_id=f"{pid:04X}", 

175 bus=device.bus, # pyright: ignore[reportArgumentType] 

176 device=device.address or 0, 

177 manufacturer=_get_usb_str( 

178 device, # pyright: ignore[reportArgumentType] 

179 man, 

180 default=unavail, 

181 ), 

182 product=_get_usb_str( 

183 device, # pyright: ignore[reportArgumentType] 

184 device.iProduct, # pyright: ignore[reportAttributeAccessIssue] 

185 default=unavail, 

186 ), 

187 serial_number=_get_usb_str( 

188 device, # pyright: ignore[reportArgumentType] 

189 iser, # pyright: ignore[reportAttributeAccessIssue] 

190 default="", 

191 ), 

192 device_type=device_type, 

193 ) 

194 result.append(item) 

195 

196 # -- Sort by (vendor, product, bus, device). 

197 result = sorted( 

198 result, 

199 key=lambda d: ( 

200 d.vendor_id.lower(), 

201 d.product_id.lower(), 

202 d.bus, 

203 d.device, 

204 ), 

205 ) 

206 

207 if util.is_debug(1): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 cout(f"Found {len(result)} USB devices:") 

209 for device in result: 

210 cout(str(device)) 

211 

212 # -- All done. 

213 return result 

214 

215 

216@dataclass 

217class UsbDeviceFilter: 

218 """A class to filter a list of usb devices by attributes. We use the 

219 Fluent Interface design pattern so we can assert that the values that 

220 the caller passes as filters are not unintentionally None or empty 

221 unintentionally.""" 

222 

223 _vendor_id: str | None = None 

224 _product_id: str | None = None 

225 product_regex: str | None = None 

226 _serial_num: str | None = None 

227 

228 def summary(self) -> str: 

229 """User friendly representation of the filter""" 

230 terms = [] 

231 

232 if self._vendor_id: 

233 terms.append(f"VID={self._vendor_id}") 

234 if self._product_id: 

235 terms.append(f"PID={self._product_id}") 

236 if self.product_regex: 

237 terms.append(f'REGEX="{self.product_regex}"') 

238 if self._serial_num: 

239 terms.append(f'S/N="{self._serial_num}"') 

240 if terms: 

241 return "[" + ", ".join(terms) + "]" 

242 return "[all]" 

243 

244 def set_vendor_id(self, vendor_id: str) -> "UsbDeviceFilter": 

245 """Pass only devices with given vendor id.""" 

246 check_usb_id_format(vendor_id) 

247 self._vendor_id = vendor_id 

248 return self 

249 

250 def set_product_id(self, product_id: str) -> "UsbDeviceFilter": 

251 """Pass only devices with given product id.""" 

252 check_usb_id_format(product_id) 

253 self._product_id = product_id 

254 return self 

255 

256 def set_product_regex(self, product_regex: str) -> "UsbDeviceFilter": 

257 """Pass only devices whose product string match given regex.""" 

258 assert product_regex 

259 self.product_regex = product_regex 

260 return self 

261 

262 def set_serial_num(self, serial_num: str) -> "UsbDeviceFilter": 

263 """Pass only devices given product serial number..""" 

264 assert serial_num 

265 self._serial_num = serial_num 

266 return self 

267 

268 def _eval(self, device: UsbDevice) -> bool: 

269 """Test if the devices passes this field.""" 

270 if (self._vendor_id is not None) and ( 

271 self._vendor_id != device.vendor_id 

272 ): 

273 return False 

274 

275 if (self._product_id is not None) and ( 

276 self._product_id != device.product_id 

277 ): 

278 return False 

279 

280 if (self.product_regex is not None) and not re.search( 

281 self.product_regex, device.product 

282 ): 

283 return False 

284 

285 if (self._serial_num is not None) and ( 

286 self._serial_num.lower() != device.serial_number.lower() 

287 ): 

288 return False 

289 

290 return True 

291 

292 def filter(self, devices: List[UsbDevice]): 

293 """Return a copy of the list with items that are pass this filter. 

294 Items order is preserved.""" 

295 result = [d for d in devices if self._eval(d)] 

296 return result