Coverage for birdplan/plugins/cmdline/ospf/interface/show.py: 30%

103 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 03:27 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2024, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BirdPlan commandline options for OSPF interface show.""" 

20 

21import argparse 

22import io 

23from typing import Any, Dict, List 

24 

25from ..... import BirdPlanOSPFInterfaceStatus 

26from .....cmdline import BirdPlanCommandLine, BirdPlanCommandlineResult 

27from .....console.colors import colored 

28from ...cmdline_plugin import BirdPlanCmdlinePluginBase 

29 

30__all__ = ["BirdPlanCmdlineOSPFInterfaceShow"] 

31 

32 

33class BirdPlanCmdlineOSPFInterfaceShowResult(BirdPlanCommandlineResult): 

34 """BirdPlan OSPF interface show result class.""" 

35 

36 def as_text(self) -> str: # noqa: CFQ001 # pylint: disable=too-many-locals,too-many-branches,too-many-statements 

37 """ 

38 Return data in text format. 

39 

40 Returns 

41 ------- 

42 str 

43 Return data in text format. 

44 

45 """ 

46 

47 ob = io.StringIO() 

48 

49 ob.write("OSPF interface overrides:\n") 

50 ob.write("-------------------------\n") 

51 # Loop with sorted override list 

52 if "areas" in self.data["overrides"]: 

53 for area_name, area in sorted(self.data["overrides"]["areas"].items()): 

54 # Skip if we have no interfaces 

55 if ("interfaces" not in area) or (not area["interfaces"]): 

56 continue 

57 # Print out override 

58 ob.write(" Area: " + colored(area_name, "cyan") + "\n") 

59 for interface_name, interface in area["interfaces"].items(): 

60 ob.write(" Interface: " + colored(interface_name, "cyan") + "\n") 

61 # Check if we have a cost override 

62 if "cost" in interface: 

63 ob.write(f" - Cost.......: {interface['cost']}\n") 

64 # Check if we have a cost override 

65 if "ecmp_weight" in interface: 

66 ob.write(f" - ECMP Weight: {interface['ecmp_weight']}\n") 

67 ob.write("\n") 

68 # If we have no overrides, just print out --none-- 

69 if ("areas" not in self.data["overrides"]) or (not self.data["overrides"]["areas"]): 

70 ob.write("--none--\n") 

71 ob.write("\n") 

72 

73 # Get a list of all areas we know about 

74 areas_all = list(self.data["current"]["areas"].keys()) + list(self.data["pending"]["areas"].keys()) 

75 areas_all = sorted(set(areas_all)) 

76 

77 # Work out the lists of unique interfaces per area 

78 interfaces_all: Dict[str, List[str]] = {} 

79 for area_name in areas_all: 

80 # Initailize this areas interface list 

81 interfaces_all[area_name] = [] 

82 # Check if we have interfaces in the current configuration 

83 if area_name in self.data["current"]["areas"] and "interfaces" in self.data["current"]["areas"][area_name]: 

84 interfaces_all[area_name] += self.data["current"]["areas"][area_name]["interfaces"].keys() 

85 # Check if we have interfaces in the pending configuration 

86 if area_name in self.data["pending"]["areas"] and "interfaces" in self.data["pending"]["areas"][area_name]: 

87 interfaces_all[area_name] += self.data["pending"]["areas"][area_name]["interfaces"].keys() 

88 # Make sure the interface list is unique and sorted 

89 interfaces_all[area_name] = sorted(set(interfaces_all[area_name])) 

90 

91 ob.write("OSPF interface status:\n") 

92 ob.write("----------------------\n") 

93 

94 # Loop with areas 

95 for area_name, interface_list in sorted(interfaces_all.items()): 

96 # Create area section 

97 ob.write(" Area: " + colored(area_name, "cyan") + "\n") 

98 

99 # Loop with sorted interface list 

100 for interface in interface_list: 

101 # Create interface section 

102 ob.write(" Interface: " + colored(interface, "cyan") + "\n") 

103 

104 # Check if we have pending values 

105 pending_cost = None 

106 pending_ecmp_weight = None 

107 if ( 

108 area_name in self.data["pending"]["areas"] 

109 and interface in self.data["pending"]["areas"][area_name]["interfaces"] 

110 ): 

111 # Make things easier 

112 pending_interface = self.data["pending"]["areas"][area_name]["interfaces"][interface] 

113 

114 pending_cost = pending_interface["cost"] 

115 pending_ecmp_weight = pending_interface["ecmp_weight"] 

116 

117 # Short circuit if we don't have anything pending 

118 if pending_cost is None and pending_ecmp_weight is None: 

119 ob.write(" " + colored("-removed-", "magenta") + "\n") 

120 continue 

121 

122 # Grab current cost status 

123 current_cost = None 

124 current_ecmp_weight = None 

125 if ( 

126 area_name in self.data["current"]["areas"] 

127 and interface in self.data["current"]["areas"][area_name]["interfaces"] 

128 ): 

129 # Make things easier below 

130 current_interface = self.data["current"]["areas"][area_name]["interfaces"][interface] 

131 # Check if we have a current cost and ECMP weight 

132 if "cost" in current_interface: # noqa: SIM908 

133 current_cost = current_interface["cost"] 

134 if "ecmp_weight" in current_interface: # noqa: SIM908 

135 current_ecmp_weight = current_interface["ecmp_weight"] 

136 

137 # Work out the cost string 

138 cost_str = None 

139 if (current_cost is not None) and (pending_cost is not None): 

140 cost_str = f"{current_cost} → " 

141 if current_cost != pending_cost: 

142 cost_str += colored(f"{pending_cost}", "yellow") 

143 else: 

144 cost_str += f"{pending_cost}" 

145 else: 

146 cost_str = colored(f"{pending_cost}", "green") 

147 # Work out the ECMP weight 

148 ecmp_weight_str = None 

149 if (current_ecmp_weight is not None) and (pending_ecmp_weight is not None): 

150 ecmp_weight_str = f"{current_ecmp_weight} → " 

151 if current_ecmp_weight != pending_ecmp_weight: 

152 ecmp_weight_str += colored(f"{pending_ecmp_weight}", "yellow") 

153 else: 

154 ecmp_weight_str += f"{pending_ecmp_weight}" 

155 else: 

156 ecmp_weight_str = colored(f"{pending_ecmp_weight}", "green") 

157 

158 ob.write(f" - Cost.......: {cost_str}\n") 

159 ob.write(f" - ECMP Weight: {ecmp_weight_str}\n") 

160 # Separate areas 

161 ob.write("\n") 

162 

163 # Add newline to end of output 

164 ob.write(colored("NEW", "green") + " - New interface configuration\n") 

165 ob.write(colored("CHANGED", "yellow") + " - Pending interface configuration\n") 

166 ob.write("\n") 

167 

168 return ob.getvalue() 

169 

170 

171class BirdPlanCmdlineOSPFInterfaceShow(BirdPlanCmdlinePluginBase): 

172 """BirdPlan "ospf interface show" command.""" 

173 

174 def __init__(self) -> None: 

175 """Initialize object.""" 

176 

177 super().__init__() 

178 

179 # Plugin setup 

180 self.plugin_description = "birdplan ospf interface show" 

181 self.plugin_order = 30 

182 

183 def register_parsers(self, args: Dict[str, Any]) -> None: 

184 """ 

185 Register commandline parsers. 

186 

187 Parameters 

188 ---------- 

189 args : Dict[str, Any] 

190 Method argument(s). 

191 

192 """ 

193 

194 plugins = args["plugins"] 

195 

196 parent_subparsers = plugins.call_plugin("birdplan.plugins.cmdline.ospf.interface", "get_subparsers", {}) 

197 

198 # CMD: ospf interface show 

199 subparser = parent_subparsers.add_parser("show", help="Show OSPF interfaces") 

200 subparser.add_argument( 

201 "--action", 

202 action="store_const", 

203 const="ospf_interface_show", 

204 default="ospf_interface_show", 

205 help=argparse.SUPPRESS, 

206 ) 

207 subparser.add_argument( 

208 "area", 

209 nargs="?", 

210 metavar="AREA", 

211 help="Optional area in which the interface is in", 

212 ) 

213 subparser.add_argument( 

214 "interface", 

215 nargs="?", 

216 metavar="IFACE", 

217 help="Optional interface", 

218 ) 

219 

220 # Set our internal subparser property 

221 self._subparser = subparser 

222 self._subparsers = None 

223 

224 def cmd_ospf_interface_show(self, args: Any) -> Any: 

225 """ 

226 Commandline handler for "ospf interface show" action. 

227 

228 Parameters 

229 ---------- 

230 args : Dict[str, Any] 

231 Method argument(s). 

232 

233 """ 

234 

235 if not self._subparser: # pragma: no cover 

236 raise RuntimeError() 

237 

238 cmdline: BirdPlanCommandLine = args["cmdline"] 

239 

240 # Suppress info output 

241 cmdline.birdplan.birdconf.birdconfig_globals.suppress_info = True 

242 

243 # Load BirdPlan configuration using the cache 

244 cmdline.birdplan_load_config(ignore_irr_changes=True, ignore_peeringdb_changes=True, use_cached=True) 

245 

246 # Grab peer list 

247 res: BirdPlanOSPFInterfaceStatus = cmdline.birdplan.state_ospf_interface_status() 

248 

249 return BirdPlanCmdlineOSPFInterfaceShowResult(res)