Coverage for birdplan/plugins/cmdline/bgp/peer/show.py: 23%

138 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 03:27 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2024, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BirdPlan commandline options for BGP peer show <peer>.""" 

20 

21import argparse 

22import io 

23from typing import Any, Dict 

24 

25from birdplan import BirdPlanBGPPeerShow 

26 

27from .....cmdline import BirdPlanCommandLine, BirdPlanCommandlineResult 

28from .....console.colors import colored 

29from ...cmdline_plugin import BirdPlanCmdlinePluginBase 

30 

31__all__ = ["BirdPlanCmdlineBGPPeerShowPeerArg"] 

32 

33 

34class BirdPlanCmdlineBGPPeerShowPeerArgResult(BirdPlanCommandlineResult): 

35 """BirdPlan BGP peer show peer result.""" 

36 

37 def as_text(self) -> str: # noqa: CFQ001 # pylint: disable=too-many-locals,too-many-branches,too-many-statements 

38 """ 

39 Return data in text format. 

40 

41 Returns 

42 ------- 

43 str 

44 Return data in text format. 

45 

46 """ 

47 

48 ob = io.StringIO() 

49 

50 # Work out filter strings to use 

51 aspath_filters = "none" 

52 origin_filters = "none" 

53 as_sets_filter = "none" 

54 if "import_filter" in self.data: 

55 # Check for aspath_asns in our filter 

56 aspath_strs = [] 

57 if "aspath_asns" in self.data["import_filter"]: 

58 if "static" in self.data["import_filter"]["aspath_asns"]: 

59 count = len(self.data["import_filter"]["aspath_asns"]["static"]) 

60 aspath_strs.append(f"{count} manual") 

61 if "calculated" in self.data["import_filter"]["aspath_asns"]: 

62 count = len(self.data["import_filter"]["aspath_asns"]["calculated"]) 

63 aspath_strs.append(f"{count} calculated") 

64 aspath_filters = ", ".join(aspath_strs) 

65 # Check for origin filters 

66 origin_strs = [] 

67 if "origin_asns" in self.data["import_filter"]: 

68 if "static" in self.data["import_filter"]["origin_asns"]: 

69 count = len(self.data["import_filter"]["origin_asns"]["static"]) 

70 origin_strs.append(f"{count} manual") 

71 if "irr" in self.data["import_filter"]["origin_asns"]: 

72 count = len(self.data["import_filter"]["origin_asns"]["irr"]) 

73 origin_strs.append(f"{count} from IRR") 

74 origin_filters = ", ".join(origin_strs) 

75 # Check for AS-SET filters 

76 if "as_sets" in self.data["import_filter"]: 

77 if isinstance(self.data["import_filter"]["as_sets"], list): 

78 as_sets_filter = ", ".join(self.data["import_filter"]["as_sets"]) 

79 elif isinstance(self.data["import_filter"]["as_sets"], str): 

80 as_sets_filter = self.data["import_filter"]["as_sets"] 

81 

82 ob.write(f"ASN.............: {self.data['asn']}\n") 

83 ob.write(f"Type............: {self.data['type']}\n") 

84 ob.write(f"Name............: {self.data['name']}\n") 

85 ob.write(f"Description.....: {self.data['description']}\n") 

86 ob.write(f"AS-SET..........: {as_sets_filter}\n") 

87 ob.write(f"Origin filters..: {origin_filters}\n") 

88 ob.write(f"AS-Path filters.: {aspath_filters}\n") 

89 

90 # Loop with protocols and output self.data 

91 for protocol, protocol_data in self.data["protocols"].items(): 

92 # Work out better protocol string 

93 protocol_str = "" 

94 if protocol == "ipv4": 

95 protocol_str = "IPv4" 

96 elif protocol == "ipv6": 

97 protocol_str = "IPv6" 

98 

99 # Check for import prefix filters 

100 prefix_filters = "none" 

101 if "import_filter" in self.data: # noqa: SIM102 

102 if "prefixes" in self.data["import_filter"]: 

103 prefix_filter_strs = [] 

104 if "irr" in self.data["import_filter"]["prefixes"]: # noqa: SIM102 

105 if protocol in self.data["import_filter"]["prefixes"]["irr"]: 

106 count = len(self.data["import_filter"]["prefixes"]["irr"][protocol]) 

107 prefix_filter_strs.append(f"{count} from IRR") 

108 if "static" in self.data["import_filter"]["prefixes"]: # noqa: SIM102 

109 if protocol in self.data["import_filter"]["prefixes"]["static"]: 

110 count = len(self.data["import_filter"]["prefixes"]["static"][protocol]) 

111 prefix_filter_strs.append(f"{count} manual") 

112 prefix_filters = ", ".join(prefix_filter_strs) 

113 

114 ob.write(f"\n Protocol: {protocol_str}\n") 

115 

116 # Setup o the protocol_status so we can copy-paste the below colors 

117 protocol_status = protocol_data["status"] 

118 # Set the state and info with no color 

119 state = protocol_status["state"] 

120 info = protocol_status["info"] 

121 

122 # NK - Update in peer_arg show too 

123 # Check how we're going to color entries based on their state and info 

124 if protocol_status["state"] == "down": 

125 state = colored(protocol_status["state"], "red") 

126 if "last_error" in protocol_status: 

127 info += " - " + colored(protocol_status["last_error"], "red") 

128 elif protocol_status["state"] == "up": # noqa: SIM102 

129 if protocol_status["info"] == "established": 

130 state = colored(protocol_status["state"], "green") 

131 info = colored(protocol_status["info"], "green") 

132 

133 # Check for quarantine flag 

134 quarantined = "no" 

135 if "quarantine" in self.data and self.data["quarantine"]: 

136 quarantined = colored("yes", "red") 

137 

138 # Check for graceful shutdown flag 

139 graceful_shutdown = "no" 

140 if "graceful_shutdown" in self.data and self.data["graceful_shutdown"]: 

141 graceful_shutdown = colored("yes", "red") 

142 

143 prefix_limit_str = "" 

144 if "prefix_limit" in self.data: 

145 if "peeringdb" in self.data["prefix_limit"]: 

146 if protocol in self.data["prefix_limit"]["peeringdb"]: 

147 prefix_limit_str = " from PeeringDB" 

148 elif "static" in self.data["prefix_limit"]: # noqa: SIM102 

149 if protocol in self.data["prefix_limit"]["static"]: 

150 prefix_limit_str = " manual" 

151 

152 ob.write(f" Mode..............: {protocol_data['mode']}\n") 

153 ob.write(f" State.............: {state} ({info}) since {protocol_status['since']}\n") 

154 ob.write(f" Local AS..........: {protocol_status['local_as']}\n") 

155 

156 # Work out our source address, depending if peer is up or not 

157 source_address = protocol_status.get("source_address", protocol_data["source_address"]) 

158 ob.write(f" Source IP.........: {source_address}\n") 

159 

160 ob.write(f" Neighbor AS.......: {protocol_status['neighbor_as']}\n") 

161 ob.write(f" Neighbor IP.......: {protocol_status['neighbor_address']}\n") 

162 

163 if "neighbor_id" in protocol_status: 

164 ob.write(f" Neighbor ID.......: {protocol_status['neighbor_id']}\n") 

165 

166 # Check if we have an import limit 

167 if "import_limit" in protocol_status: 

168 import_limit = protocol_status["import_limit"] 

169 import_limit_action = protocol_status["import_limit_action"] 

170 ob.write(f" Import limit......: {import_limit}{prefix_limit_str} (action: {import_limit_action})\n") 

171 else: 

172 ob.write(" Import limit......: none\n") 

173 

174 ob.write(f" Prefix filters....: {prefix_filters}\n") 

175 

176 # Check if we have route information 

177 if "routes_imported" in protocol_status and "routes_exported" in protocol_status: 

178 routes_imported = protocol_status["routes_imported"] 

179 routes_exported = protocol_status["routes_exported"] 

180 ob.write(f" Prefixes..........: {routes_imported} imported, {routes_exported} exported\n") 

181 

182 if "security" in self.data and self.data["security"]: 

183 ob.write(f" BGP security......: {', '.join(sorted(self.data['security']))}\n") 

184 

185 ob.write(f" Quarantined.......: {quarantined}\n") 

186 ob.write(f" Graceful shutdown.: {graceful_shutdown}\n") 

187 

188 ob.write("\n") 

189 

190 ob.write("\n") 

191 

192 return ob.getvalue() 

193 

194 

195class BirdPlanCmdlineBGPPeerShowPeerArg(BirdPlanCmdlinePluginBase): 

196 """BirdPlan "bgp peer show <peer>" command.""" 

197 

198 def __init__(self) -> None: 

199 """Initialize object.""" 

200 

201 super().__init__() 

202 

203 # Plugin setup 

204 self.plugin_description = "birdplan bgp peer show <peer>" 

205 self.plugin_order = 30 

206 

207 def register_parsers(self, args: Dict[str, Any]) -> None: 

208 """ 

209 Register commandline parsers. 

210 

211 Parameters 

212 ---------- 

213 args : Dict[str, Any] 

214 Method argument(s). 

215 

216 """ 

217 

218 plugins = args["plugins"] 

219 

220 parent_subparsers = plugins.call_plugin("birdplan.plugins.cmdline.bgp.peer", "get_subparsers", {}) 

221 

222 # CMD: bgp peer show <peer> 

223 subparser = parent_subparsers.add_parser("show", help="BGP peer show commands") 

224 

225 subparser.add_argument( 

226 "--action", 

227 action="store_const", 

228 const="bgp_peer_show", 

229 default="bgp_peer_show", 

230 help=argparse.SUPPRESS, 

231 ) 

232 

233 subparser.add_argument( 

234 "peer", 

235 nargs=1, 

236 metavar="PEER", 

237 help="Peer to show (its BirdPlan name)", 

238 ) 

239 

240 # Set our internal subparser property 

241 self._subparser = subparser 

242 self._subparsers = None 

243 # self._subparsers = subparser.add_subparsers() 

244 

245 def cmd_bgp_peer_show(self, args: Any) -> Any: 

246 """ 

247 Commandline handler for "bgp peer show <peer>" action. 

248 

249 Parameters 

250 ---------- 

251 args : Dict[str, Any] 

252 Method argument(s). 

253 

254 """ 

255 

256 if not self._subparser: # pragma: no cover 

257 raise RuntimeError() 

258 

259 cmdline: BirdPlanCommandLine = args["cmdline"] 

260 

261 # Grab Bird control socket 

262 bird_socket = cmdline.args.bird_socket[0] 

263 

264 # Grab the peer 

265 peer = cmdline.args.peer[0] 

266 

267 # Suppress info output 

268 cmdline.birdplan.birdconf.birdconfig_globals.suppress_info = True 

269 

270 # Load BirdPlan configuration using the cache 

271 cmdline.birdplan_load_config(ignore_irr_changes=True, ignore_peeringdb_changes=True, use_cached=True) 

272 

273 # Try grab peer info 

274 res: BirdPlanBGPPeerShow = cmdline.birdplan.state_bgp_peer_show(peer, bird_socket=bird_socket) 

275 

276 return BirdPlanCmdlineBGPPeerShowPeerArgResult(res)