Coverage for src/birdplan/bird_config/sections/protocols/bgp/peer/actions.py: 78%

297 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-04 10:19 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2025, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BIRD BGP peer action support.""" 

20 

21from enum import Enum 

22from typing import Any 

23 

24from ......exceptions import BirdPlanConfigError 

25from ..... import util 

26from ....functions import BirdVariable, SectionFunctions 

27from ..bgp_functions import BGPFunctions 

28 

29__all__ = ["BGPPeerActions"] 

30 

31 

32class BGPPeerActionType(Enum): 

33 """BGP peer action type enum.""" 

34 

35 IMPORT = "import" 

36 EXPORT = "export" 

37 

38 

39class BGPPeerAction: 

40 """BGP peer action.""" 

41 

42 _global_functions: SectionFunctions 

43 _bgp_functions: BGPFunctions 

44 

45 _asn: int 

46 _peer_name: str 

47 

48 _action_id: int 

49 _action_type: BGPPeerActionType 

50 

51 _match_origin_asn: list[str] 

52 _match_prefix: list[str] 

53 _match_community: list[str] 

54 _match_extended_community: list[str] 

55 _match_large_community: list[str] 

56 

57 _action_reject: bool 

58 _action_add_community: list[str] 

59 _action_add_extended_community: list[str] 

60 _action_add_large_community: list[str] 

61 _action_remove_community: list[str] 

62 _action_remove_extended_community: list[str] 

63 _action_remove_large_community: list[str] 

64 _action_prepend: int 

65 

66 def __init__( # noqa: PLR0913 

67 self, 

68 global_functions: SectionFunctions, 

69 bgp_functions: BGPFunctions, 

70 asn: int, 

71 peer_name: str, 

72 action_id: int, 

73 action: dict[str, Any], 

74 ) -> None: 

75 """Initialize BGP peer action.""" 

76 

77 self._global_functions = global_functions 

78 self._bgp_functions = bgp_functions 

79 

80 self._asn = asn 

81 self._peer_name = peer_name 

82 self._action_id = action_id 

83 

84 # Grab action type 

85 self._action_type = BGPPeerActionType(action["type"]) 

86 

87 # Initialize matches and actions 

88 self._match_origin_asn = [] 

89 self._match_prefix = [] 

90 self._match_community = [] 

91 self._match_extended_community = [] 

92 self._match_large_community = [] 

93 

94 self._action_reject = False 

95 self._action_add_community = [] 

96 self._action_add_extended_community = [] 

97 self._action_add_large_community = [] 

98 self._action_remove_community = [] 

99 self._action_remove_extended_community = [] 

100 self._action_remove_large_community = [] 

101 self._action_prepend = 0 

102 

103 # Parse action data 

104 if "matches" in action: 

105 self._parse_matches(action["matches"]) 

106 self._parse_actions(action["action"]) 

107 

108 def _parse_matches(self, matches: dict[str, Any]) -> None: 

109 """Parse the matches.""" 

110 

111 # Check what matches we have 

112 for match_k, match_v in matches.items(): 

113 # Make sure we only have lists or strings 

114 if not isinstance(match_v, list) and not isinstance(match_v, str): 

115 raise BirdPlanConfigError("Action value for 'match' is not valid") 

116 # If we have a string, convert it to a list 

117 match_v_list: list[str] = [] 

118 if isinstance(match_v, str): 

119 match_v_list.append(match_v) 

120 else: 

121 match_v_list.extend(match_v) 

122 # Process each type of match 

123 if match_k in ("origin_asn", "prefix", "community", "extended_community", "large_community"): 

124 setattr(self, f"_match_{match_k}", match_v_list) 

125 else: 

126 raise BirdPlanConfigError(f"Action match type '{match_k}' is not valid") 

127 

128 def _parse_actions(self, action: str | dict[str, Any]) -> None: # noqa: C901,PLR0912 

129 """Parse the actions.""" 

130 # Check if this is a simple string action 

131 if isinstance(action, str): 

132 # Check if its a reject action 

133 if action == "reject": 

134 self._action_reject = True 

135 # And if its not known, raise an error 

136 else: 

137 raise BirdPlanConfigError(f"Action value '{action}' is not valid") 

138 return 

139 # Check what actions we have 

140 for action_k, action_v in action.items(): 

141 # Make sure we only have lists or strings 

142 if not isinstance(action_v, list) and not isinstance(action_v, str) and not isinstance(action_v, int): 

143 raise BirdPlanConfigError("Action value for 'action' is not valid") 

144 # If we have a string, convert it to a list 

145 action_v_list: list[str | int] = [] 

146 if isinstance(action_v, list): 

147 action_v_list.extend(action_v) 

148 else: 

149 action_v_list.append(action_v) 

150 

151 # Process each type of action 

152 if action_k in ( 

153 "add_community", 

154 "add_extended_community", 

155 "add_large_community", 

156 "remove_community", 

157 "remove_extended_community", 

158 "remove_large_community", 

159 ): 

160 setattr(self, f"_action_{action_k}", action_v_list) 

161 elif action_k == "prepend": 

162 # Make sure prepend can only be a string 

163 if isinstance(action_v, str): 

164 prepend = int(action_v) 

165 elif isinstance(action_v, int): 

166 prepend = action_v 

167 else: 

168 raise BirdPlanConfigError(f"Action prepend value '{action_v}' is not valid") 

169 # Make sure prepend value is valid 

170 if 10 > prepend < 1: # noqa: PLR2004 

171 raise BirdPlanConfigError(f"Action prepend value '{action_v}' is not valid") 

172 self._action_prepend = prepend 

173 else: 

174 raise BirdPlanConfigError(f"Action type '{action_k}' is not valid") 

175 

176 def generate_constants(self) -> list[str]: # noqa: C901 

177 """Generate the constants for the action.""" 

178 constants = [] 

179 # Loop with basic match types 

180 for match_type in ( 

181 "origin_asn", 

182 "prefix", 

183 ): 

184 match_list_name = getattr(self, f"match_list_name_{match_type}") 

185 # Pull out straight matches 

186 match_list = [x for x in getattr(self, f"_match_{match_type}") if not x.startswith("!")] 

187 # Pull out negative NOT matches 

188 match_list_not = [x[1:] for x in getattr(self, f"_match_{match_type}") if x.startswith("!")] 

189 # Generate origin ASN match lists 

190 if match_type == "origin_asn": 

191 if match_list: 

192 constants.append(f"define {match_list_name} = [") 

193 constants.extend([f" {line}" for line in ", ".join(match_list).split(" ")]) 

194 constants.append("];") 

195 if match_list_not: 

196 constants.append(f"define {match_list_name}_not = [") 

197 constants.extend([f" {line}" for line in ", ".join(match_list_not).split(" ")]) 

198 constants.append("];") 

199 # Generate prefix match lists 

200 elif match_type == "prefix": 

201 # Pull out IPv4 and IPv6 prefixes lists 

202 match_list_v4, match_list_not_v4, match_list_v6, match_list_not_v6 = self._get_match_prefix_lists() 

203 # Generate IPv4 prefix match lists 

204 if match_list_v4: 

205 constants.append(f"define {match_list_name}_v4 = [") 

206 constants.extend([f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_v4]).split(" ")]) 

207 constants.append("];") 

208 if match_list_not_v4: 

209 constants.append(f"define {match_list_name}_not_v4 = [") 

210 constants.extend( 

211 [f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_not_v4]).split(" ")] 

212 ) 

213 constants.append("];") 

214 # Generate IPv6 prefix match lists 

215 if match_list_v6: 

216 constants.append(f"define {match_list_name}_v6 = [") 

217 constants.extend([f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_v6]).split(" ")]) 

218 constants.append("];") 

219 if match_list_not_v6: 

220 constants.append(f"define {match_list_name}_not_v6 = [") 

221 constants.extend( 

222 [f" {line}" for line in ", ".join([x.replace(" ", "") for x in match_list_not_v6]).split(" ")] 

223 ) 

224 constants.append("];") 

225 

226 # Loop with community match types 

227 for match_type in ( 

228 "community", 

229 "extended_community", 

230 "large_community", 

231 ): 

232 match_list_name = getattr(self, f"match_list_name_{match_type}") 

233 match_list = getattr(self, f"_match_{match_type}") 

234 # Loop with match list and convert from xxx:yyy to (xxx,yyy) format 

235 match_list = util.sanitize_community_list(match_list) 

236 if match_list: 

237 constants.append(f"define {match_list_name} = [") 

238 constants.extend([f" {line}" for line in ", ".join(match_list).split(" ")]) 

239 constants.append("];") 

240 # Return list of constants for this peer 

241 return constants 

242 

243 def generate_function(self) -> list[str]: # noqa: C901,PLR0912,PLR0915 

244 """Generate the function for the action.""" 

245 function = [] 

246 # Generate function header 

247 function.append(f"function {self.function_name}() -> bool") 

248 function.append("string filter_name;") 

249 function.append("{") 

250 function.append(f' filter_name = "{self.function_name}";') 

251 

252 # Generate match statements 

253 # NK: We use the for loop because we have duplicate code between the various match types 

254 for match_type in ( 

255 "origin_asn", 

256 "prefix", 

257 "community", 

258 "extended_community", 

259 "large_community", 

260 ): 

261 match_list_name = getattr(self, f"match_list_name_{match_type}") 

262 # Grab list of raw matches 

263 match_list_raw = getattr(self, f"_match_{match_type}") 

264 # Pull out straight matches 

265 match_list = [x for x in match_list_raw if not x.startswith("!")] 

266 # Pull out negative NOT matches 

267 match_list_not = [x[1:] for x in match_list_raw if x.startswith("!")] 

268 

269 # Add commentfor this match type 

270 if match_list_raw: 

271 function.append(f" # Match {match_type}") 

272 # Check origin ASN match 

273 if match_type == "origin_asn": 

274 if match_list: 

275 function.append(f" if (bgp_path.first !~ {match_list_name}) then return true;") 

276 if match_list_not: 

277 function.append(f" if (bgp_path.first !~ {match_list_name}_not) then return true;") 

278 # Check prefix match 

279 elif match_type == "prefix": 

280 # Pull out IPv4 and IPv6 prefixes lists 

281 match_list_v4, match_list_not_v4, match_list_v6, match_list_not_v6 = self._get_match_prefix_lists() 

282 # Check IPv4 prefix match 

283 if match_list_v4 or match_list_not_v4: 

284 function.append(" if (net.type = NET_IP4) then {{") 

285 if match_list_v4: 

286 function.append(f" if (net !~ {match_list_name}_v4) then return true;") 

287 if match_list_not_v4: 

288 function.append(f" if (net ~ {match_list_name}_not_v4) then return true;") 

289 function.append(" }}") 

290 # Check IPv6 prefix match 

291 if match_list_v6 or match_list_not_v6: 

292 function.append(" if (net.type = NET_IP6) then {{") 

293 if match_list_v6: 

294 function.append(f" if (net !~ {match_list_name}_v6) then return true;") 

295 if match_list_not_v6: 

296 function.append(f" if (net ~ {match_list_name}_not_v6) then return true;") 

297 function.append(" }}") 

298 # Check community match 

299 elif match_type == "community": 

300 if match_list: 

301 function.append(f" if (bgp_community !~ {match_list_name}) then return true;") 

302 if match_list_not: 

303 function.append(f" if (bgp_community ~ {match_list_name}_not) then return true;") 

304 # Check extended community match 

305 elif match_type == "extended_community": 

306 if match_list: 

307 function.append(f" if (bgp_ext_community !~ {match_list_name}) then return true;") 

308 if match_list_not: 

309 function.append(f" if (bgp_ext_community ~ {match_list_name}_not) then return true;") 

310 # Check large community match 

311 elif match_type == "large_community": 

312 if match_list: 

313 function.append(f" if (bgp_large_community !~ {match_list_name}) then return true;") 

314 if match_list_not: 

315 function.append(f" if (bgp_large_community ~ {match_list_name}_not) then return true;") 

316 

317 # 

318 # Generate action statements 

319 # 

320 

321 fallthrough_value = "true" 

322 

323 # Handle reject action 

324 if self._action_reject: 

325 if self.action_type == BGPPeerActionType.IMPORT: 

326 function.append( 

327 f" if DEBUG then print\n" 

328 f""" "{self.function_name} [action:{self.action_id}] Filtering ",""" 

329 f" {self.global_functions.route_info()};" 

330 ) 

331 function.append(" bgp_large_community.add(BGP_LC_FILTERED_ACTION);") 

332 elif self.action_type == BGPPeerActionType.EXPORT: 

333 function.append( 

334 f" if DEBUG then print\n" 

335 f""" "{self.function_name} [action:{self.action_id}] Rejecting ",""" 

336 f" {self.global_functions.route_info()};" 

337 ) 

338 # Set fallthrough value to false as we're rejecting 

339 fallthrough_value = "false" 

340 

341 # Handle add_community action 

342 if self._action_add_community: 

343 function.append( 

344 f" if DEBUG then print\n" 

345 f""" "{self.function_name} [action:{self.action_id}] Adding communities """ 

346 f"""{", ".join(self._action_add_community)} to ",""" 

347 f" {self.global_functions.route_info()};" 

348 ) 

349 function.extend( 

350 [f" bgp_community.add({community});" for community in util.sanitize_community_list(self._action_add_community)] 

351 ) 

352 

353 # Handle add_extended_community action 

354 if self._action_add_extended_community: 

355 function.append( 

356 f" if DEBUG then print\n" 

357 f""" "{self.function_name} [action:{self.action_id}] Adding extended communities """ 

358 f"""{", ".join(self._action_add_extended_community)} to ",""" 

359 f" {self.global_functions.route_info()};" 

360 ) 

361 function.extend( 

362 [ 

363 f" bgp_ext_community.add({community});" 

364 for community in util.sanitize_community_list(self._action_add_extended_community) 

365 ] 

366 ) 

367 

368 # Handle add_large_community action 

369 if self._action_add_large_community: 

370 function.append( 

371 f" if DEBUG then print\n" 

372 f""" "{self.function_name} [action:{self.action_id}] Adding large communities """ 

373 f"""{", ".join(self._action_add_large_community)} to ",""" 

374 f" {self.global_functions.route_info()};" 

375 ) 

376 function.extend( 

377 [ 

378 f" bgp_large_community.add({community});" 

379 for community in util.sanitize_community_list(self._action_add_large_community) 

380 ] 

381 ) 

382 

383 # Handle remove_community action 

384 if self._action_remove_community: 

385 function.append( 

386 f" if DEBUG then print\n" 

387 f""" "{self.function_name} [action:{self.action_id}] Removing communities """ 

388 f"""{", ".join(self._action_remove_community)} from ",""" 

389 f" {self.global_functions.route_info()};" 

390 ) 

391 function.extend([f" bgp_community.remove({community});" for community in self._action_remove_community]) 

392 

393 # Handle remove_extended_community action 

394 if self._action_remove_extended_community: 

395 function.append( 

396 f" if DEBUG then print\n" 

397 f""" "{self.function_name} [action:{self.action_id}] Removing extended communities """ 

398 f"""{", ".join(self._action_remove_extended_community)} from ",""" 

399 f" {self.global_functions.route_info()};" 

400 ) 

401 function.extend( 

402 [f" bgp_extended_community.remove({community});" for community in self._action_remove_extended_community] 

403 ) 

404 

405 # Handle remove_large_community action 

406 if self._action_remove_large_community: 

407 function.append( 

408 f" if DEBUG then print\n" 

409 f""" "{self.function_name} [action:{self.action_id}] Removing large communities """ 

410 f"""{", ".join(self._action_remove_large_community)} from ",""" 

411 f" {self.global_functions.route_info()};" 

412 ) 

413 function.extend([f" bgp_large_community.remove({community});" for community in self._action_remove_large_community]) 

414 

415 # Handle prepend action 

416 if self._action_prepend: 

417 function.append(f" {self.bgp_functions.peer_prepend(BirdVariable('BGP_ASN'), self._action_prepend)};") 

418 # Generate function footer 

419 function.append(f" return {fallthrough_value};") 

420 function.append("}") 

421 

422 # Return list of function lines 

423 return function 

424 

425 def _get_match_prefix_lists(self) -> tuple[list[str], list[str], list[str], list[str]]: 

426 """Get the match prefix lists for IPv4 an IPv6.""" 

427 match_list_v4 = [x for x in self._match_prefix if ":" not in x] 

428 match_list_not_v4 = [x for x in self._match_prefix if ":" not in x] 

429 match_list_v6 = [x for x in self._match_prefix if ":" in x] 

430 match_list_not_v6 = [x for x in self._match_prefix if ":" in x] 

431 return match_list_v4, match_list_not_v4, match_list_v6, match_list_not_v6 

432 

433 @property 

434 def global_functions(self) -> SectionFunctions: 

435 """Return the global functions.""" 

436 return self._global_functions 

437 

438 @property 

439 def bgp_functions(self) -> BGPFunctions: 

440 """Return the BGP functions.""" 

441 return self._bgp_functions 

442 

443 @property 

444 def function_name(self) -> str: 

445 """Return our origin ASN deny list name.""" 

446 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_{self.action_type.value}" 

447 

448 @property 

449 def match_list_name_origin_asn(self) -> str: 

450 """Return our origin ASN deny list name.""" 

451 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_origin_asn" 

452 

453 @property 

454 def match_list_name_prefix(self) -> str: 

455 """Return our origin ASN deny list name.""" 

456 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_prefix" 

457 

458 @property 

459 def match_list_name_community(self) -> str: 

460 """Return our origin ASN deny list name.""" 

461 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_community" 

462 

463 @property 

464 def match_list_name_extended_community(self) -> str: 

465 """Return our origin ASN deny list name.""" 

466 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_extended_community" 

467 

468 @property 

469 def match_list_name_large_community(self) -> str: 

470 """Return our origin ASN deny list name.""" 

471 return f"bgp_AS{self.asn}_{self.peer_name}_action{self.action_id}_match_large_community" 

472 

473 @property 

474 def asn(self) -> int: 

475 """Return the ASN.""" 

476 return self._asn 

477 

478 @property 

479 def peer_name(self) -> str: 

480 """Return the peer name.""" 

481 return self._peer_name 

482 

483 @property 

484 def action_id(self) -> int: 

485 """Return the action ID.""" 

486 return self._action_id 

487 

488 @property 

489 def action_type(self) -> BGPPeerActionType: 

490 """Return the action type.""" 

491 return self._action_type 

492 

493 

494class BGPPeerActions: 

495 """BGP peer actions.""" 

496 

497 _global_functions: SectionFunctions 

498 _bgp_functions: BGPFunctions 

499 

500 _asn: int 

501 _peer_name: str 

502 _actions: list[BGPPeerAction] 

503 

504 def __init__(self, global_functions: SectionFunctions, bgp_functions: BGPFunctions, asn: int, peer_name: str) -> None: 

505 """Initialize BGP peer actions.""" 

506 

507 self._global_functions = global_functions 

508 self._bgp_functions = bgp_functions 

509 

510 self._asn = asn 

511 self._peer_name = peer_name 

512 

513 self._actions = [] 

514 

515 def configure(self, actions: list[dict[str, Any]]) -> None: 

516 """Configure BGP peer actions.""" 

517 # Check type of data provided 

518 if not isinstance(actions, list): 

519 raise BirdPlanConfigError( 

520 f"Configuration item has invalid type '{type(actions)}' in bgp:peers:{self.peer_name}:actions" 

521 ) 

522 # Loop with actions 

523 action_id = 1 

524 for action in actions: 

525 self.actions.append( 

526 BGPPeerAction(self.global_functions, self.bgp_functions, self.asn, self.peer_name, action_id, action) 

527 ) 

528 action_id += 1 

529 

530 def generate_constants(self) -> list[str]: 

531 """Generate the constants for the actions.""" 

532 constants = [] 

533 for action in self.actions: 

534 constants.extend(action.generate_constants()) 

535 return constants 

536 

537 def generate_functions(self) -> list[str]: 

538 """Generate the functions for the actions.""" 

539 functions = [] 

540 for action in self.actions: 

541 functions.extend(action.generate_function()) 

542 return functions 

543 

544 @property 

545 def global_functions(self) -> SectionFunctions: 

546 """Return the global functions.""" 

547 return self._global_functions 

548 

549 @property 

550 def bgp_functions(self) -> BGPFunctions: 

551 """Return the BGP functions.""" 

552 return self._bgp_functions 

553 

554 @property 

555 def asn(self) -> int: 

556 """Return the ASN.""" 

557 return self._asn 

558 

559 @property 

560 def peer_name(self) -> str: 

561 """Return the peer name.""" 

562 return self._peer_name 

563 

564 @property 

565 def actions(self) -> list[BGPPeerAction]: 

566 """Return the actions.""" 

567 return self._actions