Coverage for birdplan/bgpq3.py: 92%

112 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 03:27 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2024, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BGPQ3/4 support class.""" 

20 

21import functools 

22import ipaddress 

23import json 

24import shutil 

25import subprocess # nosec 

26import time 

27from typing import Any, Dict, List, Optional, Union 

28 

29from .exceptions import BirdPlanError 

30 

31__all__ = ["BGPQ3"] 

32 

33 

34# Keep a cache for results returned while loaded into memory 

35# 

36# Example: 

37# bgpq3_cache = { 

38# 'whois.radb.net:43': { 

39# 'objects': { 

40# 'AS174': { 

41# '_timestamp': 0000000000, 

42# 'value': xxxxxx, 

43# } 

44# } 

45# } 

46# } 

47bgpq3_cache: Dict[str, Dict[str, Any]] = {} 

48 

49 

50class BGPQ3: 

51 """BGPQ3 support class.""" 

52 

53 _host: str 

54 _port: int 

55 _sources: str 

56 

57 def __init__(self, host: str = "whois.radb.net", port: int = 43, sources: str = "RADB"): 

58 """Initialize object.""" 

59 

60 # Grab items we can set and associated defaults 

61 self._host = host 

62 self._port = port 

63 self._sources = sources 

64 

65 @functools.lru_cache(maxsize=1) # noqa: B019 

66 def _exe(self) -> str: 

67 """Return the bgpq3 executable.""" 

68 

69 for exe in ("bgpq3", "bgpq4"): 

70 if shutil.which(exe): 

71 return exe 

72 

73 raise BirdPlanError("bgpq3/bgpq4 executable not found in PATH") 

74 

75 def get_asns(self, as_sets: Union[str, List[str]]) -> List[str]: # pylint: disable=too-many-branches 

76 """Get prefixes.""" 

77 

78 # Build an object list depending on the type of "objects" above 

79 objects: List[str] = [] 

80 if isinstance(as_sets, str): 

81 objects.append(as_sets) 

82 else: 

83 objects.extend(as_sets) 

84 

85 # Grab ASNs 

86 is_birdplan_internal = False 

87 asns_bgpq3: Dict[str, List[str]] = {} 

88 for obj in objects: 

89 # Try pull result from our cache 

90 result: Any = self._cache(f"asns:{obj}") 

91 # If we can't, grab the result from BGPQ3 live 

92 if not result: 

93 # Try query object 

94 try: 

95 result = self._bgpq3(["-l", "asns", "-t", "-3", obj]) 

96 except subprocess.CalledProcessError as err: 

97 raise BirdPlanError(f"Failed to query IRR ASNs from object '{obj}':\n%s" % err.output.decode("UTF-8")) from None 

98 # Cache the result we got 

99 self._cache(f"asns:{obj}", result) 

100 # Check if this is a birdplan internal object 

101 if obj.startswith("_BIRDPLAN:"): 

102 is_birdplan_internal = True 

103 # Update return value with result 

104 asns_bgpq3.update(result) 

105 

106 # If we don't have "asns" returned in the JSON structure, raise an exception 

107 if "asns" not in asns_bgpq3: # pragma: no cover 

108 raise BirdPlanError(f"BGPQ3 output error, expecting 'asns': {asns_bgpq3}") 

109 

110 filtered_asns = [] 

111 for asn in asns_bgpq3["asns"]: 

112 # Convert to int for below 

113 asn_i = int(asn) 

114 # 0 Reserved by [RFC7607] [RFC7607] 

115 # 112 Used by the AS112 project to sink misdirected DNS queries; see [RFC7534] [RFC7534] 

116 # 23456 AS_TRANS; reserved by [RFC6793] [RFC6793] 

117 # 65535 Reserved by [RFC7300] [RFC7300] 

118 # 4294967295 Reserved by [RFC7300] [RFC7300] 

119 if asn_i in [0, 112, 23456, 65535, 4294967295]: 

120 continue 

121 

122 # 

123 # NK: So objects that start with _BIRDPLAN are used for the tests, so we need to treat them a little differently below 

124 # if that's the case. 

125 # 

126 

127 # 64496-64511 For documentation and sample code; reserved by [RFC5398] [RFC5398] 

128 if (64496 <= asn_i <= 64511) and not is_birdplan_internal: 

129 continue 

130 # 64512-65534 For private use; reserved by [RFC6996] [RFC6996] 

131 if (64512 <= asn_i <= 65534) and not is_birdplan_internal: 

132 continue 

133 # 65536-65551 For documentation and sample code; reserved by [RFC5398] [RFC5398] 

134 if (65536 <= asn_i <= 65551) and not is_birdplan_internal: 

135 continue 

136 

137 # 4200000000-4294967294 For private use; reserved by [RFC6996] [RFC6996] 

138 if 4200000000 <= asn_i <= 4294967294: 

139 continue 

140 # We passed all the checks, lets add to the filtered list 

141 filtered_asns.append(asn) 

142 

143 return filtered_asns 

144 

145 def get_prefixes(self, as_sets: Union[str, List[str]]) -> Dict[str, List[str]]: 

146 """Get prefixes.""" 

147 

148 # Build an object list depending on the type of "objects" above 

149 objects: List[str] = [] 

150 if isinstance(as_sets, str): 

151 objects.append(as_sets) 

152 else: 

153 objects.extend(as_sets) 

154 

155 # Grab IPv4 and IPv6 prefixes 

156 prefixes_bgpq3: Dict[str, List[Dict[str, Any]]] = {} 

157 for obj in objects: 

158 # Try pull result from our cache 

159 result: Any = self._cache(f"prefixes:{obj}") 

160 # If we can't, grab the result from BGPQ3 live 

161 if not result: 

162 result = {} 

163 # Lets see if we get results back from our IRR queries 

164 try: 

165 result.update(self._bgpq3(["-l", "ipv4", "-m", "24", "-4", "-A", obj])) 

166 except subprocess.CalledProcessError as err: 

167 raise BirdPlanError( 

168 f"Failed to query IRR IPv4 prefixes from object '{obj}':\n%s" % err.output.decode("UTF-8") 

169 ) from None 

170 try: 

171 result.update(self._bgpq3(["-l", "ipv6", "-m", "48", "-6", "-A", obj])) 

172 except subprocess.CalledProcessError as err: 

173 raise BirdPlanError( 

174 f"Failed to query IRR IPv6 prefixes from object '{obj}':\n%s" % err.output.decode("UTF-8") 

175 ) from None 

176 # Cache the result we got 

177 self._cache(f"prefixes:{obj}", result) 

178 # Update return value with result 

179 prefixes_bgpq3.update(result) 

180 

181 # Start out with no prefixes 

182 prefixes: Dict[str, List[str]] = {"ipv4": [], "ipv6": []} 

183 

184 for family in ("ipv4", "ipv6"): 

185 for prefix in prefixes_bgpq3[family]: 

186 # If it is exact, its easy to add 

187 if prefix["exact"]: 

188 prefixes[family].append(prefix["prefix"]) 

189 else: 

190 # Work out greater_equal component 

191 if "greater-equal" in prefix: # noqa: SIM401 

192 greater_equal = prefix["greater-equal"] 

193 else: 

194 greater_equal = ipaddress.ip_network(prefix["prefix"]).prefixlen 

195 # Add prefix, format is %s{%s,%s} 

196 prefixes[family].append(f'{prefix["prefix"]}{{{greater_equal},{prefix["less-equal"]}}}') 

197 

198 return prefixes 

199 

200 def _bgpq3(self, args: List[str]) -> Any: 

201 """Run bgpq3.""" 

202 

203 # Run the IP tool with JSON output 

204 cmd_args = [self._exe(), "-h", self.server, "-j"] 

205 # Add our args 

206 cmd_args.extend(args) 

207 

208 # Grab result from process execution 

209 result = subprocess.check_output(cmd_args, stderr=subprocess.STDOUT) # nosec 

210 

211 # Return the decoded json output 

212 return json.loads(result) 

213 

214 def _cache(self, obj: str, value: Optional[Any] = None) -> Optional[Any]: # noqa: CFQ004 

215 """Retrieve or store value in cache.""" 

216 

217 if self.server not in bgpq3_cache: 

218 bgpq3_cache[self.server] = {"objects": {}} 

219 

220 if not value: 

221 # If the cached obj does not exist, return None 

222 if obj not in bgpq3_cache[self.server]["objects"]: 

223 return None 

224 # Grab the cached object 

225 cached = bgpq3_cache[self.server]["objects"][obj] 

226 # Make sure its timestamp is within 60s of being retrieved, if not, return None 

227 if cached["_timestamp"] + 60 < time.time(): # pragma: no cover 

228 return None 

229 # Else its valid, return the cached value 

230 return cached["value"] 

231 

232 # Set the cached value 

233 bgpq3_cache[self.server]["objects"][obj] = { 

234 "_timestamp": time.time(), 

235 "value": value, 

236 } 

237 

238 return value 

239 

240 @property 

241 def server(self) -> str: 

242 """Return the server we're using.""" 

243 return f"{self.host}:{self.port}" 

244 

245 @property 

246 def host(self) -> str: 

247 """Return the host we're using.""" 

248 return self._host 

249 

250 @property 

251 def port(self) -> int: 

252 """Return the port we're using.""" 

253 return self._port