Coverage for apio/utils/serial_util.py: 88%

94 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-06 10:20 +0000

1"""Serial devices related utilities.""" 

2 

3import re 

4from typing import List 

5from dataclasses import dataclass 

6from serial.tools.list_ports import comports 

7from serial.tools.list_ports_common import ListPortInfo 

8from apio.common.apio_console import cout 

9from apio.utils import util, usb_util 

10 

11 

12@dataclass() 

13class SerialDevice: 

14 """A data class to hold the information of a single Serial device.""" 

15 

16 # pylint: disable=too-many-instance-attributes 

17 

18 port: str 

19 port_name: str 

20 vendor_id: str 

21 product_id: str 

22 manufacturer: str 

23 product: str 

24 serial_number: str 

25 device_type: str 

26 location: str 

27 

28 def __post_init__(self): 

29 """Check that vid, pid, has the format %04X.""" 

30 usb_util.check_usb_id_format(self.vendor_id) 

31 usb_util.check_usb_id_format(self.product_id) 

32 

33 def summary(self) -> str: 

34 """Returns a user friendly short description of this device.""" 

35 return ( 

36 f"[{self.port}] " 

37 f"[{self.vendor_id}:{self.product_id}] " 

38 f"[{self.manufacturer}] [{self.product}] " 

39 f"[{self.serial_number}]" 

40 ) 

41 

42 

43def scan_serial_devices() -> List[SerialDevice]: 

44 """Scan the connected serial devices and return their information.""" 

45 

46 # -- Initial empty device list 

47 devices = [] 

48 

49 # -- Use the serial.tools.list_ports module for reading the 

50 # -- serial ports. More info: 

51 # -- https://pyserial.readthedocs.io/en/latest/tools.html 

52 list_port_info: List[ListPortInfo] = comports() 

53 assert isinstance(list_port_info, list) 

54 if list_port_info: 54 ↛ 58line 54 didn't jump to line 58 because the condition on line 54 was always true

55 assert isinstance(list_port_info[0], ListPortInfo) 

56 

57 # -- Collect the items that are USB serial ports. 

58 for port in list_port_info: 

59 

60 # -- Dump for debugging. 

61 if util.is_debug(1): 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 cout("Raw serial port:") 

63 cout(f" Device: {port.device}") 

64 cout(f" Hwid: {port.hwid}") 

65 cout(f" Interface: {port.interface}") 

66 

67 # # -- Skip if not a serial port. 

68 if not port.device: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 continue 

70 

71 # -- Skip if not a USB device. 

72 if not port.vid or not port.pid: 72 ↛ 76line 72 didn't jump to line 76 because the condition on line 72 was always true

73 continue 

74 

75 # -- Add device to list. 

76 devices.append( 

77 SerialDevice( 

78 port=port.device, 

79 port_name=port.name, 

80 vendor_id=f"{port.vid:04X}", 

81 product_id=f"{port.pid:04X}", 

82 manufacturer=port.manufacturer, 

83 product=port.product, 

84 serial_number=port.serial_number or "", 

85 device_type=usb_util.get_device_type(port.vid, port.pid), 

86 location=port.location, 

87 ) 

88 ) 

89 

90 # -- Sort by port name, case insensitive. 

91 devices = sorted(devices, key=lambda d: d.port.lower()) 

92 

93 if util.is_debug(1): 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 cout(f"Found {len(devices)} serial device:") 

95 for device in devices: 

96 cout(str(device)) 

97 

98 # -- All done. 

99 return devices 

100 

101 

102@dataclass 

103class SerialDeviceFilter: 

104 """A class to filter a list of serial devices by attributes. We use the 

105 Fluent Interface design pattern so we can assert that the values that 

106 the caller passes as filters are not unintentionally None or empty 

107 unintentionally.""" 

108 

109 _vendor_id: str = None 

110 _product_id: str = None 

111 _product_regex: str = None 

112 _serial_port: str = None 

113 _serial_num: str = None 

114 

115 def summary(self) -> str: 

116 """User friendly representation of the filter""" 

117 terms = [] 

118 if self._vendor_id: 

119 terms.append(f"VID={self._vendor_id}") 

120 if self._product_id: 

121 terms.append(f"PID={self._product_id}") 

122 if self._product_regex: 

123 terms.append(f'REGEX="{self._product_regex}"') 

124 if self._serial_port: 

125 terms.append(f"PORT={self._serial_port}") 

126 if self._serial_num: 

127 terms.append(f'S/N="{self._serial_num}"') 

128 if terms: 

129 return "[" + ", ".join(terms) + "]" 

130 return "[all]" 

131 

132 def set_vendor_id(self, vendor_id: str) -> "SerialDeviceFilter": 

133 """Pass only devices with given vendor id.""" 

134 usb_util.check_usb_id_format(vendor_id) 

135 self._vendor_id = vendor_id 

136 return self 

137 

138 def set_product_id(self, product_id: str) -> "SerialDeviceFilter": 

139 """Pass only devices given product id.""" 

140 usb_util.check_usb_id_format(product_id) 

141 self._product_id = product_id 

142 return self 

143 

144 def set_product_regex(self, product_regex: str) -> "SerialDeviceFilter": 

145 """Pass only devices whose product string match given regex.""" 

146 assert product_regex 

147 self._product_regex = product_regex 

148 return self 

149 

150 def set_port(self, serial_port: str) -> "SerialDeviceFilter": 

151 """Pass only devices given product serial port..""" 

152 assert serial_port 

153 self._serial_port = serial_port 

154 return self 

155 

156 def set_serial_num(self, serial_num: str) -> "SerialDeviceFilter": 

157 """Pass only devices given product serial number..""" 

158 assert serial_num 

159 self._serial_num = serial_num 

160 return self 

161 

162 def _eval(self, device: SerialDevice) -> bool: 

163 """Test if the devices passes this field.""" 

164 if (self._vendor_id is not None) and ( 

165 self._vendor_id != device.vendor_id 

166 ): 

167 return False 

168 

169 if (self._product_id is not None) and ( 

170 self._product_id != device.product_id 

171 ): 

172 return False 

173 

174 if (self._product_regex is not None) and not re.search( 

175 self._product_regex, device.product 

176 ): 

177 return False 

178 

179 # -- We allow matching by full port string or by port name. 

180 if (self._serial_port is not None) and ( 

181 ( 

182 self._serial_port.lower() 

183 not in [device.port.lower(), device.port_name.lower()] 

184 ) 

185 ): 

186 return False 

187 

188 if (self._serial_num is not None) and ( 

189 self._serial_num.lower() != device.serial_number.lower() 

190 ): 

191 return False 

192 

193 return True 

194 

195 def filter(self, devices: List[SerialDevice]): 

196 """Return a copy of the list with items that are pass this filter. 

197 Items order is preserved.""" 

198 result = [d for d in devices if self._eval(d)] 

199 return result