Coverage for birdplan/bird_config/sections/protocols/pipe.py: 98%

98 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 03:27 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2024, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BIRD pipe protocol configuration.""" 

20 

21from enum import Enum 

22from typing import Callable, List, Union 

23 

24from ...globals import BirdConfigGlobals 

25from ..base import SectionBase 

26 

27__all__ = ["ProtocolPipeFilterType", "ProtocolPipe"] 

28 

29 

30PipeTableNameType = Union[str, Callable[[str], str]] 

31 

32 

33class ProtocolPipeFilterType(Enum): 

34 """ 

35 Pipe protocol filter name type. 

36 

37 NO_FILTER - No filter will be applied. 

38 VERSIONED - Filter will be IP versioned. 

39 UNVERSIONED - Filter will not be IP versioned. 

40 

41 """ 

42 

43 NO_FILTER = 0 

44 VERSIONED = 1 

45 UNVERSIONED = 2 

46 

47 

48# This class cannot be a decendant of the SectionProtocolBase class or we'll have circular imports 

49class ProtocolPipe(SectionBase): # pylint: disable=too-many-instance-attributes 

50 """BIRD pipe protocol configuration.""" 

51 

52 _name_suffix: str 

53 

54 _table_from: PipeTableNameType 

55 _table_to: PipeTableNameType 

56 _table_export: str 

57 _table_import: str 

58 _export_filter_type: ProtocolPipeFilterType 

59 _import_filter_type: ProtocolPipeFilterType 

60 

61 # IP versions we're creating a pipe for 

62 _ipversions: List[str] 

63 

64 def __init__( # noqa: CFQ002 # pylint: disable=too-many-arguments 

65 self, 

66 birdconfig_globals: BirdConfigGlobals, 

67 table_from: PipeTableNameType, 

68 table_to: PipeTableNameType, 

69 name: str = "", 

70 table_export: str = "none", 

71 table_import: str = "none", 

72 export_filter_type: ProtocolPipeFilterType = ProtocolPipeFilterType.NO_FILTER, 

73 import_filter_type: ProtocolPipeFilterType = ProtocolPipeFilterType.NO_FILTER, 

74 has_ipv4: bool = True, 

75 has_ipv6: bool = True, 

76 ): 

77 """Initialize the object.""" 

78 super().__init__(birdconfig_globals) 

79 

80 # Add a suffix if we have a name 

81 self._name_suffix = "" 

82 if name: 

83 self._name_suffix = f"_{name}" 

84 

85 # Grab table information 

86 self._table_from = table_from 

87 self._table_to = table_to 

88 self._table_export = table_export 

89 self._table_import = table_import 

90 self._export_filter_type = export_filter_type 

91 self._import_filter_type = import_filter_type 

92 

93 # Are we excluding anything? 

94 self._ipversions = [] 

95 if has_ipv4: 

96 self._ipversions.append("4") 

97 if has_ipv6: 

98 self._ipversions.append("6") 

99 

100 def configure(self) -> None: 

101 """Create a pipe protocol.""" 

102 super().configure() 

103 

104 for ipv in self._ipversions: 

105 self.conf.add(f"protocol pipe p_{self.table_from(ipv)}_to_{self.table_to(ipv)}{self.name_suffix} {{") 

106 self.conf.add(f' description "Pipe from {self.t_table_from(ipv)} to {self.t_table_to(ipv)}{self.name_suffix}";') 

107 self.conf.add("") 

108 self.conf.add(f" vrf {self.birdconfig_globals.vrf};") 

109 self.conf.add("") 

110 self.conf.add(f" table {self.t_table_from(ipv)};") 

111 self.conf.add(f" peer table {self.t_table_to(ipv)}{self.name_suffix};") 

112 self.conf.add("") 

113 

114 # Check if we're doing export filtering 

115 if self.export_filter_type == ProtocolPipeFilterType.VERSIONED: 

116 self.conf.add(f" export filter f_{self.table_from(ipv)}_{self.table_to(ipv)}_export;") 

117 elif self.export_filter_type == ProtocolPipeFilterType.UNVERSIONED: 

118 self.conf.add(f" export filter f_{self.table_from()}_{self.table_to()}_export;") 

119 # If not add per normal 

120 else: 

121 self.conf.add(f" export {self.table_export};") 

122 

123 # Check if we're doing import filtering 

124 if self.import_filter_type == ProtocolPipeFilterType.VERSIONED: 

125 self.conf.add(f" import filter f_{self.table_from(ipv)}_{self.table_to(ipv)}_import;") 

126 elif self.import_filter_type == ProtocolPipeFilterType.UNVERSIONED: 

127 self.conf.add(f" import filter f_{self.table_from()}_{self.table_to()}_import;") 

128 # If not add per normal 

129 else: 

130 self.conf.add(f" import {self.table_import};") 

131 

132 self.conf.add("};") 

133 self.conf.add("") 

134 

135 def table_from(self, ipv: str = "") -> str: 

136 """Return table_from with IP version included.""" 

137 

138 # If the table is callable, call it and get the name 

139 if callable(self._table_from): 

140 table_from = self._table_from(ipv) 

141 # Else just use it as a string 

142 else: 

143 table_from = f"{self._table_from}{ipv}" 

144 

145 # If it starts with t_, we need to remove t_ for now 

146 if table_from.startswith("t_"): 

147 table_from = table_from[2:] 

148 

149 return table_from 

150 

151 def table_to(self, ipv: str = "") -> str: 

152 """Return table_to with IP version included.""" 

153 

154 # If the table is callable, call it and get the name 

155 if callable(self._table_to): 

156 table_to = self._table_to(ipv) 

157 # Else just use it as a string 

158 else: 

159 table_to = f"{self._table_to}{ipv}" 

160 

161 # If it starts with t_, we need to remove t_ for now 

162 if table_to.startswith("t_"): 

163 table_to = table_to[2:] 

164 

165 return table_to 

166 

167 def t_table_from(self, ipv: str) -> str: 

168 r"""Return table_from, some tables don't have t\_ prefixes.""" 

169 table_name = self.table_from(ipv) 

170 # If it is not a master table, add t_ 

171 if not table_name.startswith("master"): 

172 table_name = "t_" + table_name 

173 return table_name 

174 

175 def t_table_to(self, ipv: str) -> str: 

176 r"""Return table_to, some tables don't have t\_ prefixes.""" 

177 table_name = self.table_to(ipv) 

178 # If it is not a master table, add t_ 

179 if not table_name.startswith("master"): 

180 table_name = "t_" + table_name 

181 return table_name 

182 

183 @property 

184 def name_suffix(self) -> str: 

185 """Return our name suffix.""" 

186 return self._name_suffix 

187 

188 @property 

189 def table_export(self) -> str: 

190 """Return that state of us exporting the table.""" 

191 return self._table_export 

192 

193 @property 

194 def table_import(self) -> str: 

195 """Return that state of us importing the table.""" 

196 return self._table_import 

197 

198 @property 

199 def export_filter_type(self) -> ProtocolPipeFilterType: 

200 """Return that state of us exporting the table filtered.""" 

201 return self._export_filter_type 

202 

203 @property 

204 def import_filter_type(self) -> ProtocolPipeFilterType: 

205 """Return that state of us importing the table filtered.""" 

206 return self._import_filter_type