Coverage for src/birdplan/bird_config/sections/protocols/bgp/peer/actions.py: 78%
297 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-04 10:19 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-04 10:19 +0000
1#
2# SPDX-License-Identifier: GPL-3.0-or-later
3#
4# Copyright (c) 2019-2025, AllWorldIT
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
19"""BIRD BGP peer action support."""
21from enum import Enum
22from typing import Any
24from ......exceptions import BirdPlanConfigError
25from ..... import util
26from ....functions import BirdVariable, SectionFunctions
27from ..bgp_functions import BGPFunctions
29__all__ = ["BGPPeerActions"]
32class BGPPeerActionType(Enum):
33 """BGP peer action type enum."""
35 IMPORT = "import"
36 EXPORT = "export"
39class BGPPeerAction:
40 """BGP peer action."""
42 _global_functions: SectionFunctions
43 _bgp_functions: BGPFunctions
45 _asn: int
46 _peer_name: str
48 _action_id: int
49 _action_type: BGPPeerActionType
51 _match_origin_asn: list[str]
52 _match_prefix: list[str]
53 _match_community: list[str]
54 _match_extended_community: list[str]
55 _match_large_community: list[str]
57 _action_reject: bool
58 _action_add_community: list[str]
59 _action_add_extended_community: list[str]
60 _action_add_large_community: list[str]
61 _action_remove_community: list[str]
62 _action_remove_extended_community: list[str]
63 _action_remove_large_community: list[str]
64 _action_prepend: int
66 def __init__( # noqa: PLR0913
67 self,
68 global_functions: SectionFunctions,
69 bgp_functions: BGPFunctions,
70 asn: int,
71 peer_name: str,
72 action_id: int,
73 action: dict[str, Any],
74 ) -> None:
75 """Initialize BGP peer action."""
77 self._global_functions = global_functions
78 self._bgp_functions = bgp_functions
80 self._asn = asn
81 self._peer_name = peer_name
82 self._action_id = action_id
84 # Grab action type
85 self._action_type = BGPPeerActionType(action["type"])
87 # Initialize matches and actions
88 self._match_origin_asn = []
89 self._match_prefix = []
90 self._match_community = []
91 self._match_extended_community = []
92 self._match_large_community = []
94 self._action_reject = False
95 self._action_add_community = []
96 self._action_add_extended_community = []
97 self._action_add_large_community = []
98 self._action_remove_community = []
99 self._action_remove_extended_community = []
100 self._action_remove_large_community = []
101 self._action_prepend = 0
103 # Parse action data
104 if "matches" in action:
105 self._parse_matches(action["matches"])
106 self._parse_actions(action["action"])
108 def _parse_matches(self, matches: dict[str, Any]) -> None:
109 """Parse the matches."""
111 # Check what matches we have
112 for match_k, match_v in matches.items():
113 # Make sure we only have lists or strings
114 if not isinstance(match_v, list) and not isinstance(match_v, str):
115 raise BirdPlanConfigError("Action value for 'match' is not valid")
116 # If we have a string, convert it to a list
117 match_v_list: list[str] = []
118 if isinstance(match_v, str):
119 match_v_list.append(match_v)
120 else:
121 match_v_list.extend(match_v)
122 # Process each type of match
123 if match_k in ("origin_asn", "prefix", "community", "extended_community", "large_community"):
124 setattr(self, f"_match_{match_k}", match_v_list)
125 else:
126 raise BirdPlanConfigError(f"Action match type '{match_k}' is not valid")
128 def _parse_actions(self, action: str | dict[str, Any]) -> None: # noqa: C901,PLR0912
129 """Parse the actions."""
130 # Check if this is a simple string action
131 if isinstance(action, str):
132 # Check if its a reject action
133 if action == "reject":
134 self._action_reject = True
135 # And if its not known, raise an error
136 else:
137 raise BirdPlanConfigError(f"Action value '{action}' is not valid")
138 return
139 # Check what actions we have
140 for action_k, action_v in action.items():
141 # Make sure we only have lists or strings
142 if not isinstance(action_v, list) and not isinstance(action_v, str) and not isinstance(action_v, int):
143 raise BirdPlanConfigError("Action value for 'action' is not valid")
144 # If we have a string, convert it to a list
145 action_v_list: list[str | int] = []
146 if isinstance(action_v, list):
147 action_v_list.extend(action_v)
148 else:
149 action_v_list.append(action_v)
151 # Process each type of action
152 if action_k in (
153 "add_community",
154 "add_extended_community",
155 "add_large_community",
156 "remove_community",
157 "remove_extended_community",
158 "remove_large_community",
159 ):
160 setattr(self, f"_action_{action_k}", action_v_list)
161 elif action_k == "prepend":
162 # Make sure prepend can only be a string
163 if isinstance(action_v, str):
164 prepend = int(action_v)
165 elif isinstance(action_v, int):
166 prepend = action_v
167 else:
168 raise BirdPlanConfigError(f"Action prepend value '{action_v}' is not valid")
169 # Make sure prepend value is valid
170 if 10 > prepend < 1: # noqa: PLR2004
171 raise BirdPlanConfigError(f"Action prepend value '{action_v}' is not valid")
172 self._action_prepend = prepend
173 else:
174 raise BirdPlanConfigError(f"Action type '{action_k}' is not valid")
176 def generate_constants(self) -> list[str]: # noqa: C901
177 """Generate the constants for the action."""
178 constants = []
179 # Loop with basic match types
180 for match_type in (
181 "origin_asn",
182 "prefix",
183 ):
184 match_list_name = getattr(self, f"match_list_name_{match_type}")
185 # Pull out straight matches
186 match_list = [x for x in getattr(self, f"_match_{match_type}") if not x.startswith("!")]
187 # Pull out negative NOT matches
188 match_list_not = [x[1:] for x in getattr(self, f"_match_{match_type}") if x.startswith("!")]
189 # Generate origin ASN match lists
190 if match_type == "origin_asn":
191 if match_list:
192 constants.append(f"define {match_list_name} = [")
193 constants.extend([f" {line}" for line in ", ".join(match_list).split(" ")])
194 constants.append("];")
195 if match_list_not:
196 constants.append(f"define {match_list_name}_not = [")
197 constants.extend([f" {line}" for line in ", ".join(match_list_not).split(" ")])
198 constants.append("];")
199 # Generate prefix match lists
200 elif match_type == "prefix":
201 # Pull out IPv4 and IPv6 prefixes lists
202 match_list_v4, match_list_not_v4, match_list_v6, match_list_not_v6 = self._get_match_prefix_lists()
203 # Generate IPv4 prefix match lists
204 if match_list_v4:
205 constants.append(f"define {match_list_name}_v4 = [")
206 constants.extend([f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_v4]).split(" ")])
207 constants.append("];")
208 if match_list_not_v4:
209 constants.append(f"define {match_list_name}_not_v4 = [")
210 constants.extend(
211 [f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_not_v4]).split(" ")]
212 )
213 constants.append("];")
214 # Generate IPv6 prefix match lists
215 if match_list_v6:
216 constants.append(f"define {match_list_name}_v6 = [")
217 constants.extend([f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_v6]).split(" ")])
218 constants.append("];")
219 if match_list_not_v6:
220 constants.append(f"define {match_list_name}_not_v6 = [")
221 constants.extend(
222 [f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_not_v6]).split(" ")]
223 )
224 constants.append("];")
226 # Loop with community match types
227 for match_type in (
228 "community",
229 "extended_community",
230 "large_community",
231 ):
232 match_list_name = getattr(self, f"match_list_name_{match_type}")
233 match_list = getattr(self, f"_match_{match_type}")
234 # Loop with match list and convert from xxx:yyy to (xxx,yyy) format
235 match_list = util.sanitize_community_list(match_list)
236 if match_list:
237 constants.append(f"define {match_list_name} = [")
238 constants.extend([f" {line}" for line in ", ".join(match_list).split(" ")])
239 constants.append("];")
240 # Return list of constants for this peer
241 return constants
243 def generate_function(self) -> list[str]: # noqa: C901,PLR0912,PLR0915
244 """Generate the function for the action."""
245 function = []
246 # Generate function header
247 function.append(f"function {self.function_name}() -> bool")
248 function.append("string filter_name;")
249 function.append("{")
250 function.append(f' filter_name = "{self.function_name}";')
252 # Generate match statements
253 # NK: We use the for loop because we have duplicate code between the various match types
254 for match_type in (
255 "origin_asn",
256 "prefix",
257 "community",
258 "extended_community",
259 "large_community",
260 ):
261 match_list_name = getattr(self, f"match_list_name_{match_type}")
262 # Grab list of raw matches
263 match_list_raw = getattr(self, f"_match_{match_type}")
264 # Pull out straight matches
265 match_list = [x for x in match_list_raw if not x.startswith("!")]
266 # Pull out negative NOT matches
267 match_list_not = [x[1:] for x in match_list_raw if x.startswith("!")]
269 # Add commentfor this match type
270 if match_list_raw:
271 function.append(f" # Match {match_type}")
272 # Check origin ASN match
273 if match_type == "origin_asn":
274 if match_list:
275 function.append(f" if (bgp_path.first !~ {match_list_name}) then return true;")
276 if match_list_not:
277 function.append(f" if (bgp_path.first !~ {match_list_name}_not) then return true;")
278 # Check prefix match
279 elif match_type == "prefix":
280 # Pull out IPv4 and IPv6 prefixes lists
281 match_list_v4, match_list_not_v4, match_list_v6, match_list_not_v6 = self._get_match_prefix_lists()
282 # Check IPv4 prefix match
283 if match_list_v4 or match_list_not_v4:
284 function.append(" if (net.type = NET_IP4) then {{")
285 if match_list_v4:
286 function.append(f" if (net !~ {match_list_name}_v4) then return true;")
287 if match_list_not_v4:
288 function.append(f" if (net ~ {match_list_name}_not_v4) then return true;")
289 function.append(" }}")
290 # Check IPv6 prefix match
291 if match_list_v6 or match_list_not_v6:
292 function.append(" if (net.type = NET_IP6) then {{")
293 if match_list_v6:
294 function.append(f" if (net !~ {match_list_name}_v6) then return true;")
295 if match_list_not_v6:
296 function.append(f" if (net ~ {match_list_name}_not_v6) then return true;")
297 function.append(" }}")
298 # Check community match
299 elif match_type == "community":
300 if match_list:
301 function.append(f" if (bgp_community !~ {match_list_name}) then return true;")
302 if match_list_not:
303 function.append(f" if (bgp_community ~ {match_list_name}_not) then return true;")
304 # Check extended community match
305 elif match_type == "extended_community":
306 if match_list:
307 function.append(f" if (bgp_ext_community !~ {match_list_name}) then return true;")
308 if match_list_not:
309 function.append(f" if (bgp_ext_community ~ {match_list_name}_not) then return true;")
310 # Check large community match
311 elif match_type == "large_community":
312 if match_list:
313 function.append(f" if (bgp_large_community !~ {match_list_name}) then return true;")
314 if match_list_not:
315 function.append(f" if (bgp_large_community ~ {match_list_name}_not) then return true;")
317 #
318 # Generate action statements
319 #
321 fallthrough_value = "true"
323 # Handle reject action
324 if self._action_reject:
325 if self.action_type == BGPPeerActionType.IMPORT:
326 function.append(
327 f" if DEBUG then print\n"
328 f""" "{self.function_name} [action:{self.action_id}] Filtering ","""
329 f" {self.global_functions.route_info()};"
330 )
331 function.append(" bgp_large_community.add(BGP_LC_FILTERED_ACTION);")
332 elif self.action_type == BGPPeerActionType.EXPORT:
333 function.append(
334 f" if DEBUG then print\n"
335 f""" "{self.function_name} [action:{self.action_id}] Rejecting ","""
336 f" {self.global_functions.route_info()};"
337 )
338 # Set fallthrough value to false as we're rejecting
339 fallthrough_value = "false"
341 # Handle add_community action
342 if self._action_add_community:
343 function.append(
344 f" if DEBUG then print\n"
345 f""" "{self.function_name} [action:{self.action_id}] Adding communities """
346 f"""{", ".join(self._action_add_community)} to ","""
347 f" {self.global_functions.route_info()};"
348 )
349 function.extend(
350 [f" bgp_community.add({community});" for community in util.sanitize_community_list(self._action_add_community)]
351 )
353 # Handle add_extended_community action
354 if self._action_add_extended_community:
355 function.append(
356 f" if DEBUG then print\n"
357 f""" "{self.function_name} [action:{self.action_id}] Adding extended communities """
358 f"""{", ".join(self._action_add_extended_community)} to ","""
359 f" {self.global_functions.route_info()};"
360 )
361 function.extend(
362 [
363 f" bgp_ext_community.add({community});"
364 for community in util.sanitize_community_list(self._action_add_extended_community)
365 ]
366 )
368 # Handle add_large_community action
369 if self._action_add_large_community:
370 function.append(
371 f" if DEBUG then print\n"
372 f""" "{self.function_name} [action:{self.action_id}] Adding large communities """
373 f"""{", ".join(self._action_add_large_community)} to ","""
374 f" {self.global_functions.route_info()};"
375 )
376 function.extend(
377 [
378 f" bgp_large_community.add({community});"
379 for community in util.sanitize_community_list(self._action_add_large_community)
380 ]
381 )
383 # Handle remove_community action
384 if self._action_remove_community:
385 function.append(
386 f" if DEBUG then print\n"
387 f""" "{self.function_name} [action:{self.action_id}] Removing communities """
388 f"""{", ".join(self._action_remove_community)} from ","""
389 f" {self.global_functions.route_info()};"
390 )
391 function.extend([f" bgp_community.remove({community});" for community in self._action_remove_community])
393 # Handle remove_extended_community action
394 if self._action_remove_extended_community:
395 function.append(
396 f" if DEBUG then print\n"
397 f""" "{self.function_name} [action:{self.action_id}] Removing extended communities """
398 f"""{", ".join(self._action_remove_extended_community)} from ","""
399 f" {self.global_functions.route_info()};"
400 )
401 function.extend(
402 [f" bgp_extended_community.remove({community});" for community in self._action_remove_extended_community]
403 )
405 # Handle remove_large_community action
406 if self._action_remove_large_community:
407 function.append(
408 f" if DEBUG then print\n"
409 f""" "{self.function_name} [action:{self.action_id}] Removing large communities """
410 f"""{", ".join(self._action_remove_large_community)} from ","""
411 f" {self.global_functions.route_info()};"
412 )
413 function.extend([f" bgp_large_community.remove({community});" for community in self._action_remove_large_community])
415 # Handle prepend action
416 if self._action_prepend:
417 function.append(f" {self.bgp_functions.peer_prepend(BirdVariable('BGP_ASN'), self._action_prepend)};")
418 # Generate function footer
419 function.append(f" return {fallthrough_value};")
420 function.append("}")
422 # Return list of function lines
423 return function
425 def _get_match_prefix_lists(self) -> tuple[list[str], list[str], list[str], list[str]]:
426 """Get the match prefix lists for IPv4 an IPv6."""
427 match_list_v4 = [x for x in self._match_prefix if ":" not in x]
428 match_list_not_v4 = [x for x in self._match_prefix if ":" not in x]
429 match_list_v6 = [x for x in self._match_prefix if ":" in x]
430 match_list_not_v6 = [x for x in self._match_prefix if ":" in x]
431 return match_list_v4, match_list_not_v4, match_list_v6, match_list_not_v6
433 @property
434 def global_functions(self) -> SectionFunctions:
435 """Return the global functions."""
436 return self._global_functions
438 @property
439 def bgp_functions(self) -> BGPFunctions:
440 """Return the BGP functions."""
441 return self._bgp_functions
443 @property
444 def function_name(self) -> str:
445 """Return our origin ASN deny list name."""
446 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_{self.action_type.value}"
448 @property
449 def match_list_name_origin_asn(self) -> str:
450 """Return our origin ASN deny list name."""
451 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_origin_asn"
453 @property
454 def match_list_name_prefix(self) -> str:
455 """Return our origin ASN deny list name."""
456 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_prefix"
458 @property
459 def match_list_name_community(self) -> str:
460 """Return our origin ASN deny list name."""
461 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_community"
463 @property
464 def match_list_name_extended_community(self) -> str:
465 """Return our origin ASN deny list name."""
466 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_extended_community"
468 @property
469 def match_list_name_large_community(self) -> str:
470 """Return our origin ASN deny list name."""
471 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_large_community"
473 @property
474 def asn(self) -> int:
475 """Return the ASN."""
476 return self._asn
478 @property
479 def peer_name(self) -> str:
480 """Return the peer name."""
481 return self._peer_name
483 @property
484 def action_id(self) -> int:
485 """Return the action ID."""
486 return self._action_id
488 @property
489 def action_type(self) -> BGPPeerActionType:
490 """Return the action type."""
491 return self._action_type
494class BGPPeerActions:
495 """BGP peer actions."""
497 _global_functions: SectionFunctions
498 _bgp_functions: BGPFunctions
500 _asn: int
501 _peer_name: str
502 _actions: list[BGPPeerAction]
504 def __init__(self, global_functions: SectionFunctions, bgp_functions: BGPFunctions, asn: int, peer_name: str) -> None:
505 """Initialize BGP peer actions."""
507 self._global_functions = global_functions
508 self._bgp_functions = bgp_functions
510 self._asn = asn
511 self._peer_name = peer_name
513 self._actions = []
515 def configure(self, actions: list[dict[str, Any]]) -> None:
516 """Configure BGP peer actions."""
517 # Check type of data provided
518 if not isinstance(actions, list):
519 raise BirdPlanConfigError(
520 f"Configuration item has invalid type '{type(actions)}' in bgp:peers:{self.peer_name}:actions"
521 )
522 # Loop with actions
523 action_id = 1
524 for action in actions:
525 self.actions.append(
526 BGPPeerAction(self.global_functions, self.bgp_functions, self.asn, self.peer_name, action_id, action)
527 )
528 action_id += 1
530 def generate_constants(self) -> list[str]:
531 """Generate the constants for the actions."""
532 constants = []
533 for action in self.actions:
534 constants.extend(action.generate_constants())
535 return constants
537 def generate_functions(self) -> list[str]:
538 """Generate the functions for the actions."""
539 functions = []
540 for action in self.actions:
541 functions.extend(action.generate_function())
542 return functions
544 @property
545 def global_functions(self) -> SectionFunctions:
546 """Return the global functions."""
547 return self._global_functions
549 @property
550 def bgp_functions(self) -> BGPFunctions:
551 """Return the BGP functions."""
552 return self._bgp_functions
554 @property
555 def asn(self) -> int:
556 """Return the ASN."""
557 return self._asn
559 @property
560 def peer_name(self) -> str:
561 """Return the peer name."""
562 return self._peer_name
564 @property
565 def actions(self) -> list[BGPPeerAction]:
566 """Return the actions."""
567 return self._actions