Coverage for birdplan/bgpq3.py: 92%
112 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
1#
2# SPDX-License-Identifier: GPL-3.0-or-later
3#
4# Copyright (c) 2019-2024, AllWorldIT
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
19"""BGPQ3/4 support class."""
21import functools
22import ipaddress
23import json
24import shutil
25import subprocess # nosec
26import time
27from typing import Any, Dict, List, Optional, Union
29from .exceptions import BirdPlanError
31__all__ = ["BGPQ3"]
34# Keep a cache for results returned while loaded into memory
35#
36# Example:
37# bgpq3_cache = {
38# 'whois.radb.net:43': {
39# 'objects': {
40# 'AS174': {
41# '_timestamp': 0000000000,
42# 'value': xxxxxx,
43# }
44# }
45# }
46# }
47bgpq3_cache: Dict[str, Dict[str, Any]] = {}
50class BGPQ3:
51 """BGPQ3 support class."""
53 _host: str
54 _port: int
55 _sources: str
57 def __init__(self, host: str = "whois.radb.net", port: int = 43, sources: str = "RADB"):
58 """Initialize object."""
60 # Grab items we can set and associated defaults
61 self._host = host
62 self._port = port
63 self._sources = sources
65 @functools.lru_cache(maxsize=1) # noqa: B019
66 def _exe(self) -> str:
67 """Return the bgpq3 executable."""
69 for exe in ("bgpq3", "bgpq4"):
70 if shutil.which(exe):
71 return exe
73 raise BirdPlanError("bgpq3/bgpq4 executable not found in PATH")
75 def get_asns(self, as_sets: Union[str, List[str]]) -> List[str]: # pylint: disable=too-many-branches
76 """Get prefixes."""
78 # Build an object list depending on the type of "objects" above
79 objects: List[str] = []
80 if isinstance(as_sets, str):
81 objects.append(as_sets)
82 else:
83 objects.extend(as_sets)
85 # Grab ASNs
86 is_birdplan_internal = False
87 asns_bgpq3: Dict[str, List[str]] = {}
88 for obj in objects:
89 # Try pull result from our cache
90 result: Any = self._cache(f"asns:{obj}")
91 # If we can't, grab the result from BGPQ3 live
92 if not result:
93 # Try query object
94 try:
95 result = self._bgpq3(["-l", "asns", "-t", "-3", obj])
96 except subprocess.CalledProcessError as err:
97 raise BirdPlanError(f"Failed to query IRR ASNs from object '{obj}':\n%s" % err.output.decode("UTF-8")) from None
98 # Cache the result we got
99 self._cache(f"asns:{obj}", result)
100 # Check if this is a birdplan internal object
101 if obj.startswith("_BIRDPLAN:"):
102 is_birdplan_internal = True
103 # Update return value with result
104 asns_bgpq3.update(result)
106 # If we don't have "asns" returned in the JSON structure, raise an exception
107 if "asns" not in asns_bgpq3: # pragma: no cover
108 raise BirdPlanError(f"BGPQ3 output error, expecting 'asns': {asns_bgpq3}")
110 filtered_asns = []
111 for asn in asns_bgpq3["asns"]:
112 # Convert to int for below
113 asn_i = int(asn)
114 # 0 Reserved by [RFC7607] [RFC7607]
115 # 112 Used by the AS112 project to sink misdirected DNS queries; see [RFC7534] [RFC7534]
116 # 23456 AS_TRANS; reserved by [RFC6793] [RFC6793]
117 # 65535 Reserved by [RFC7300] [RFC7300]
118 # 4294967295 Reserved by [RFC7300] [RFC7300]
119 if asn_i in [0, 112, 23456, 65535, 4294967295]:
120 continue
122 #
123 # NK: So objects that start with _BIRDPLAN are used for the tests, so we need to treat them a little differently below
124 # if that's the case.
125 #
127 # 64496-64511 For documentation and sample code; reserved by [RFC5398] [RFC5398]
128 if (64496 <= asn_i <= 64511) and not is_birdplan_internal:
129 continue
130 # 64512-65534 For private use; reserved by [RFC6996] [RFC6996]
131 if (64512 <= asn_i <= 65534) and not is_birdplan_internal:
132 continue
133 # 65536-65551 For documentation and sample code; reserved by [RFC5398] [RFC5398]
134 if (65536 <= asn_i <= 65551) and not is_birdplan_internal:
135 continue
137 # 4200000000-4294967294 For private use; reserved by [RFC6996] [RFC6996]
138 if 4200000000 <= asn_i <= 4294967294:
139 continue
140 # We passed all the checks, lets add to the filtered list
141 filtered_asns.append(asn)
143 return filtered_asns
145 def get_prefixes(self, as_sets: Union[str, List[str]]) -> Dict[str, List[str]]:
146 """Get prefixes."""
148 # Build an object list depending on the type of "objects" above
149 objects: List[str] = []
150 if isinstance(as_sets, str):
151 objects.append(as_sets)
152 else:
153 objects.extend(as_sets)
155 # Grab IPv4 and IPv6 prefixes
156 prefixes_bgpq3: Dict[str, List[Dict[str, Any]]] = {}
157 for obj in objects:
158 # Try pull result from our cache
159 result: Any = self._cache(f"prefixes:{obj}")
160 # If we can't, grab the result from BGPQ3 live
161 if not result:
162 result = {}
163 # Lets see if we get results back from our IRR queries
164 try:
165 result.update(self._bgpq3(["-l", "ipv4", "-m", "24", "-4", "-A", obj]))
166 except subprocess.CalledProcessError as err:
167 raise BirdPlanError(
168 f"Failed to query IRR IPv4 prefixes from object '{obj}':\n%s" % err.output.decode("UTF-8")
169 ) from None
170 try:
171 result.update(self._bgpq3(["-l", "ipv6", "-m", "48", "-6", "-A", obj]))
172 except subprocess.CalledProcessError as err:
173 raise BirdPlanError(
174 f"Failed to query IRR IPv6 prefixes from object '{obj}':\n%s" % err.output.decode("UTF-8")
175 ) from None
176 # Cache the result we got
177 self._cache(f"prefixes:{obj}", result)
178 # Update return value with result
179 prefixes_bgpq3.update(result)
181 # Start out with no prefixes
182 prefixes: Dict[str, List[str]] = {"ipv4": [], "ipv6": []}
184 for family in ("ipv4", "ipv6"):
185 for prefix in prefixes_bgpq3[family]:
186 # If it is exact, its easy to add
187 if prefix["exact"]:
188 prefixes[family].append(prefix["prefix"])
189 else:
190 # Work out greater_equal component
191 if "greater-equal" in prefix: # noqa: SIM401
192 greater_equal = prefix["greater-equal"]
193 else:
194 greater_equal = ipaddress.ip_network(prefix["prefix"]).prefixlen
195 # Add prefix, format is %s{%s,%s}
196 prefixes[family].append(f'{prefix["prefix"]}{{{greater_equal},{prefix["less-equal"]}}}')
198 return prefixes
200 def _bgpq3(self, args: List[str]) -> Any:
201 """Run bgpq3."""
203 # Run the IP tool with JSON output
204 cmd_args = [self._exe(), "-h", self.server, "-j"]
205 # Add our args
206 cmd_args.extend(args)
208 # Grab result from process execution
209 result = subprocess.check_output(cmd_args, stderr=subprocess.STDOUT) # nosec
211 # Return the decoded json output
212 return json.loads(result)
214 def _cache(self, obj: str, value: Optional[Any] = None) -> Optional[Any]: # noqa: CFQ004
215 """Retrieve or store value in cache."""
217 if self.server not in bgpq3_cache:
218 bgpq3_cache[self.server] = {"objects": {}}
220 if not value:
221 # If the cached obj does not exist, return None
222 if obj not in bgpq3_cache[self.server]["objects"]:
223 return None
224 # Grab the cached object
225 cached = bgpq3_cache[self.server]["objects"][obj]
226 # Make sure its timestamp is within 60s of being retrieved, if not, return None
227 if cached["_timestamp"] + 60 < time.time(): # pragma: no cover
228 return None
229 # Else its valid, return the cached value
230 return cached["value"]
232 # Set the cached value
233 bgpq3_cache[self.server]["objects"][obj] = {
234 "_timestamp": time.time(),
235 "value": value,
236 }
238 return value
240 @property
241 def server(self) -> str:
242 """Return the server we're using."""
243 return f"{self.host}:{self.port}"
245 @property
246 def host(self) -> str:
247 """Return the host we're using."""
248 return self._host
250 @property
251 def port(self) -> int:
252 """Return the port we're using."""
253 return self._port