Coverage for apio / utils / serial_util.py: 82%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-10 03:35 +0000

1"""Serial devices related utilities.""" 

2 

3import re 

4from typing import List 

5from dataclasses import dataclass 

6from serial.tools.list_ports import comports 

7from serial.tools.list_ports_common import ListPortInfo 

8from apio.common.apio_console import cout 

9from apio.utils import util, usb_util 

10from apio.apio_context import ApioContext 

11 

12 

13@dataclass() 

14class SerialDevice: 

15 """A data class to hold the information of a single Serial device.""" 

16 

17 # pylint: disable=too-many-instance-attributes 

18 

19 port: str 

20 port_name: str 

21 vendor_id: str 

22 product_id: str 

23 manufacturer: str 

24 product: str 

25 serial_number: str 

26 device_type: str 

27 location: str 

28 

29 def __post_init__(self): 

30 """Check that vid, pid, has the format %04X.""" 

31 usb_util.check_usb_id_format(self.vendor_id) 

32 usb_util.check_usb_id_format(self.product_id) 

33 

34 def summary(self) -> str: 

35 """Returns a user friendly short description of this device.""" 

36 return ( 

37 f"[{self.port}] " 

38 f"[{self.vendor_id}:{self.product_id}] " 

39 f"[{self.manufacturer}] [{self.product}] " 

40 f"[{self.serial_number}]" 

41 ) 

42 

43 

44def scan_serial_devices(_: ApioContext) -> List[SerialDevice]: 

45 """Scan the connected serial devices and return their information.""" 

46 

47 # -- Initial empty device list 

48 devices = [] 

49 

50 # -- Use the serial.tools.list_ports module for reading the 

51 # -- serial ports. More info: 

52 # -- https://pyserial.readthedocs.io/en/latest/tools.html 

53 list_port_info: List[ListPortInfo] = comports() 

54 assert isinstance(list_port_info, list) 

55 if list_port_info: 55 ↛ 59line 55 didn't jump to line 59 because the condition on line 55 was always true

56 assert isinstance(list_port_info[0], ListPortInfo) 

57 

58 # -- Collect the items that are USB serial ports. 

59 for port in list_port_info: 

60 

61 # -- Dump for debugging. 

62 if util.is_debug(1): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 cout("Raw serial port:") 

64 cout(f" Device: {port.device}") 

65 cout(f" Hwid: {port.hwid}") 

66 cout(f" Interface: {port.interface}") 

67 

68 # # -- Skip if not a serial port. 

69 if not port.device: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 continue 

71 

72 # -- Skip if not a USB device. 

73 if not port.vid or not port.pid: 73 ↛ 81line 73 didn't jump to line 81 because the condition on line 73 was always true

74 continue 

75 

76 # -- If needed, append A or B to the serial numbers of a dual channel 

77 # -- FTDI devices such as FT2232H. 

78 # -- 

79 # -- Ideally these should be done in the comports() function of 

80 # -- the pyserial library. 

81 serial_number = port.serial_number or "" 

82 if serial_number and port.device.endswith(f"-{serial_number}0"): 

83 serial_number += "A" 

84 elif serial_number and port.device.endswith(f"-{serial_number}1"): 

85 serial_number += "B" 

86 

87 # -- Add device to list. 

88 devices.append( 

89 SerialDevice( 

90 port=port.device, 

91 port_name=port.name, 

92 vendor_id=f"{port.vid:04X}", 

93 product_id=f"{port.pid:04X}", 

94 manufacturer=port.manufacturer, 

95 product=port.product, 

96 serial_number=serial_number, 

97 device_type=usb_util.get_device_type(port.vid, port.pid), 

98 location=port.location, 

99 ) 

100 ) 

101 

102 # -- Sort by port name, case insensitive. 

103 devices = sorted(devices, key=lambda d: d.port.lower()) 

104 

105 if util.is_debug(1): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 cout(f"Found {len(devices)} serial device:") 

107 for device in devices: 

108 cout(str(device)) 

109 

110 # -- All done. 

111 return devices 

112 

113 

114@dataclass 

115class SerialDeviceFilter: 

116 """A class to filter a list of serial devices by attributes. We use the 

117 Fluent Interface design pattern so we can assert that the values that 

118 the caller passes as filters are not unintentionally None or empty 

119 unintentionally.""" 

120 

121 _vendor_id: str = None 

122 _product_id: str = None 

123 _product_regex: str = None 

124 _serial_port: str = None 

125 _serial_num: str = None 

126 

127 def summary(self) -> str: 

128 """User friendly representation of the filter""" 

129 terms = [] 

130 if self._vendor_id: 

131 terms.append(f"VID={self._vendor_id}") 

132 if self._product_id: 

133 terms.append(f"PID={self._product_id}") 

134 if self._product_regex: 

135 terms.append(f'REGEX="{self._product_regex}"') 

136 if self._serial_port: 

137 terms.append(f"PORT={self._serial_port}") 

138 if self._serial_num: 

139 terms.append(f'S/N="{self._serial_num}"') 

140 if terms: 

141 return "[" + ", ".join(terms) + "]" 

142 return "[all]" 

143 

144 def set_vendor_id(self, vendor_id: str) -> "SerialDeviceFilter": 

145 """Pass only devices with given vendor id.""" 

146 usb_util.check_usb_id_format(vendor_id) 

147 self._vendor_id = vendor_id 

148 return self 

149 

150 def set_product_id(self, product_id: str) -> "SerialDeviceFilter": 

151 """Pass only devices given product id.""" 

152 usb_util.check_usb_id_format(product_id) 

153 self._product_id = product_id 

154 return self 

155 

156 def set_product_regex(self, product_regex: str) -> "SerialDeviceFilter": 

157 """Pass only devices whose product string match given regex.""" 

158 assert product_regex 

159 self._product_regex = product_regex 

160 return self 

161 

162 def set_port(self, serial_port: str) -> "SerialDeviceFilter": 

163 """Pass only devices given product serial port..""" 

164 assert serial_port 

165 self._serial_port = serial_port 

166 return self 

167 

168 def set_serial_num(self, serial_num: str) -> "SerialDeviceFilter": 

169 """Pass only devices given product serial number..""" 

170 assert serial_num 

171 self._serial_num = serial_num 

172 return self 

173 

174 def _eval(self, device: SerialDevice) -> bool: 

175 """Test if the devices passes this field.""" 

176 if (self._vendor_id is not None) and ( 

177 self._vendor_id != device.vendor_id 

178 ): 

179 return False 

180 

181 if (self._product_id is not None) and ( 

182 self._product_id != device.product_id 

183 ): 

184 return False 

185 

186 if (self._product_regex is not None) and not re.search( 

187 self._product_regex, device.product 

188 ): 

189 return False 

190 

191 # -- We allow matching by full port string or by port name. 

192 if (self._serial_port is not None) and ( 

193 ( 

194 self._serial_port.lower() 

195 not in [device.port.lower(), device.port_name.lower()] 

196 ) 

197 ): 

198 return False 

199 

200 if (self._serial_num is not None) and ( 

201 self._serial_num.lower() != device.serial_number.lower() 

202 ): 

203 return False 

204 

205 return True 

206 

207 def filter(self, devices: List[SerialDevice]): 

208 """Return a copy of the list with items that are pass this filter. 

209 Items order is preserved.""" 

210 result = [d for d in devices if self._eval(d)] 

211 return result