Coverage for birdplan/plugins/cmdline/bgp/peer/show.py: 23%
138 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
1#
2# SPDX-License-Identifier: GPL-3.0-or-later
3#
4# Copyright (c) 2019-2024, AllWorldIT
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
19"""BirdPlan commandline options for BGP peer show <peer>."""
21import argparse
22import io
23from typing import Any, Dict
25from birdplan import BirdPlanBGPPeerShow
27from .....cmdline import BirdPlanCommandLine, BirdPlanCommandlineResult
28from .....console.colors import colored
29from ...cmdline_plugin import BirdPlanCmdlinePluginBase
31__all__ = ["BirdPlanCmdlineBGPPeerShowPeerArg"]
34class BirdPlanCmdlineBGPPeerShowPeerArgResult(BirdPlanCommandlineResult):
35 """BirdPlan BGP peer show peer result."""
37 def as_text(self) -> str: # noqa: CFQ001 # pylint: disable=too-many-locals,too-many-branches,too-many-statements
38 """
39 Return data in text format.
41 Returns
42 -------
43 str
44 Return data in text format.
46 """
48 ob = io.StringIO()
50 # Work out filter strings to use
51 aspath_filters = "none"
52 origin_filters = "none"
53 as_sets_filter = "none"
54 if "import_filter" in self.data:
55 # Check for aspath_asns in our filter
56 aspath_strs = []
57 if "aspath_asns" in self.data["import_filter"]:
58 if "static" in self.data["import_filter"]["aspath_asns"]:
59 count = len(self.data["import_filter"]["aspath_asns"]["static"])
60 aspath_strs.append(f"{count} manual")
61 if "calculated" in self.data["import_filter"]["aspath_asns"]:
62 count = len(self.data["import_filter"]["aspath_asns"]["calculated"])
63 aspath_strs.append(f"{count} calculated")
64 aspath_filters = ", ".join(aspath_strs)
65 # Check for origin filters
66 origin_strs = []
67 if "origin_asns" in self.data["import_filter"]:
68 if "static" in self.data["import_filter"]["origin_asns"]:
69 count = len(self.data["import_filter"]["origin_asns"]["static"])
70 origin_strs.append(f"{count} manual")
71 if "irr" in self.data["import_filter"]["origin_asns"]:
72 count = len(self.data["import_filter"]["origin_asns"]["irr"])
73 origin_strs.append(f"{count} from IRR")
74 origin_filters = ", ".join(origin_strs)
75 # Check for AS-SET filters
76 if "as_sets" in self.data["import_filter"]:
77 if isinstance(self.data["import_filter"]["as_sets"], list):
78 as_sets_filter = ", ".join(self.data["import_filter"]["as_sets"])
79 elif isinstance(self.data["import_filter"]["as_sets"], str):
80 as_sets_filter = self.data["import_filter"]["as_sets"]
82 ob.write(f"ASN.............: {self.data['asn']}\n")
83 ob.write(f"Type............: {self.data['type']}\n")
84 ob.write(f"Name............: {self.data['name']}\n")
85 ob.write(f"Description.....: {self.data['description']}\n")
86 ob.write(f"AS-SET..........: {as_sets_filter}\n")
87 ob.write(f"Origin filters..: {origin_filters}\n")
88 ob.write(f"AS-Path filters.: {aspath_filters}\n")
90 # Loop with protocols and output self.data
91 for protocol, protocol_data in self.data["protocols"].items():
92 # Work out better protocol string
93 protocol_str = ""
94 if protocol == "ipv4":
95 protocol_str = "IPv4"
96 elif protocol == "ipv6":
97 protocol_str = "IPv6"
99 # Check for import prefix filters
100 prefix_filters = "none"
101 if "import_filter" in self.data: # noqa: SIM102
102 if "prefixes" in self.data["import_filter"]:
103 prefix_filter_strs = []
104 if "irr" in self.data["import_filter"]["prefixes"]: # noqa: SIM102
105 if protocol in self.data["import_filter"]["prefixes"]["irr"]:
106 count = len(self.data["import_filter"]["prefixes"]["irr"][protocol])
107 prefix_filter_strs.append(f"{count} from IRR")
108 if "static" in self.data["import_filter"]["prefixes"]: # noqa: SIM102
109 if protocol in self.data["import_filter"]["prefixes"]["static"]:
110 count = len(self.data["import_filter"]["prefixes"]["static"][protocol])
111 prefix_filter_strs.append(f"{count} manual")
112 prefix_filters = ", ".join(prefix_filter_strs)
114 ob.write(f"\n Protocol: {protocol_str}\n")
116 # Setup o the protocol_status so we can copy-paste the below colors
117 protocol_status = protocol_data["status"]
118 # Set the state and info with no color
119 state = protocol_status["state"]
120 info = protocol_status["info"]
122 # NK - Update in peer_arg show too
123 # Check how we're going to color entries based on their state and info
124 if protocol_status["state"] == "down":
125 state = colored(protocol_status["state"], "red")
126 if "last_error" in protocol_status:
127 info += " - " + colored(protocol_status["last_error"], "red")
128 elif protocol_status["state"] == "up": # noqa: SIM102
129 if protocol_status["info"] == "established":
130 state = colored(protocol_status["state"], "green")
131 info = colored(protocol_status["info"], "green")
133 # Check for quarantine flag
134 quarantined = "no"
135 if "quarantine" in self.data and self.data["quarantine"]:
136 quarantined = colored("yes", "red")
138 # Check for graceful shutdown flag
139 graceful_shutdown = "no"
140 if "graceful_shutdown" in self.data and self.data["graceful_shutdown"]:
141 graceful_shutdown = colored("yes", "red")
143 prefix_limit_str = ""
144 if "prefix_limit" in self.data:
145 if "peeringdb" in self.data["prefix_limit"]:
146 if protocol in self.data["prefix_limit"]["peeringdb"]:
147 prefix_limit_str = " from PeeringDB"
148 elif "static" in self.data["prefix_limit"]: # noqa: SIM102
149 if protocol in self.data["prefix_limit"]["static"]:
150 prefix_limit_str = " manual"
152 ob.write(f" Mode..............: {protocol_data['mode']}\n")
153 ob.write(f" State.............: {state} ({info}) since {protocol_status['since']}\n")
154 ob.write(f" Local AS..........: {protocol_status['local_as']}\n")
156 # Work out our source address, depending if peer is up or not
157 source_address = protocol_status.get("source_address", protocol_data["source_address"])
158 ob.write(f" Source IP.........: {source_address}\n")
160 ob.write(f" Neighbor AS.......: {protocol_status['neighbor_as']}\n")
161 ob.write(f" Neighbor IP.......: {protocol_status['neighbor_address']}\n")
163 if "neighbor_id" in protocol_status:
164 ob.write(f" Neighbor ID.......: {protocol_status['neighbor_id']}\n")
166 # Check if we have an import limit
167 if "import_limit" in protocol_status:
168 import_limit = protocol_status["import_limit"]
169 import_limit_action = protocol_status["import_limit_action"]
170 ob.write(f" Import limit......: {import_limit}{prefix_limit_str} (action: {import_limit_action})\n")
171 else:
172 ob.write(" Import limit......: none\n")
174 ob.write(f" Prefix filters....: {prefix_filters}\n")
176 # Check if we have route information
177 if "routes_imported" in protocol_status and "routes_exported" in protocol_status:
178 routes_imported = protocol_status["routes_imported"]
179 routes_exported = protocol_status["routes_exported"]
180 ob.write(f" Prefixes..........: {routes_imported} imported, {routes_exported} exported\n")
182 if "security" in self.data and self.data["security"]:
183 ob.write(f" BGP security......: {', '.join(sorted(self.data['security']))}\n")
185 ob.write(f" Quarantined.......: {quarantined}\n")
186 ob.write(f" Graceful shutdown.: {graceful_shutdown}\n")
188 ob.write("\n")
190 ob.write("\n")
192 return ob.getvalue()
195class BirdPlanCmdlineBGPPeerShowPeerArg(BirdPlanCmdlinePluginBase):
196 """BirdPlan "bgp peer show <peer>" command."""
198 def __init__(self) -> None:
199 """Initialize object."""
201 super().__init__()
203 # Plugin setup
204 self.plugin_description = "birdplan bgp peer show <peer>"
205 self.plugin_order = 30
207 def register_parsers(self, args: Dict[str, Any]) -> None:
208 """
209 Register commandline parsers.
211 Parameters
212 ----------
213 args : Dict[str, Any]
214 Method argument(s).
216 """
218 plugins = args["plugins"]
220 parent_subparsers = plugins.call_plugin("birdplan.plugins.cmdline.bgp.peer", "get_subparsers", {})
222 # CMD: bgp peer show <peer>
223 subparser = parent_subparsers.add_parser("show", help="BGP peer show commands")
225 subparser.add_argument(
226 "--action",
227 action="store_const",
228 const="bgp_peer_show",
229 default="bgp_peer_show",
230 help=argparse.SUPPRESS,
231 )
233 subparser.add_argument(
234 "peer",
235 nargs=1,
236 metavar="PEER",
237 help="Peer to show (its BirdPlan name)",
238 )
240 # Set our internal subparser property
241 self._subparser = subparser
242 self._subparsers = None
243 # self._subparsers = subparser.add_subparsers()
245 def cmd_bgp_peer_show(self, args: Any) -> Any:
246 """
247 Commandline handler for "bgp peer show <peer>" action.
249 Parameters
250 ----------
251 args : Dict[str, Any]
252 Method argument(s).
254 """
256 if not self._subparser: # pragma: no cover
257 raise RuntimeError()
259 cmdline: BirdPlanCommandLine = args["cmdline"]
261 # Grab Bird control socket
262 bird_socket = cmdline.args.bird_socket[0]
264 # Grab the peer
265 peer = cmdline.args.peer[0]
267 # Suppress info output
268 cmdline.birdplan.birdconf.birdconfig_globals.suppress_info = True
270 # Load BirdPlan configuration using the cache
271 cmdline.birdplan_load_config(ignore_irr_changes=True, ignore_peeringdb_changes=True, use_cached=True)
273 # Try grab peer info
274 res: BirdPlanBGPPeerShow = cmdline.birdplan.state_bgp_peer_show(peer, bird_socket=bird_socket)
276 return BirdPlanCmdlineBGPPeerShowPeerArgResult(res)