Coverage for birdplan/plugins/cmdline/ospf/interface/show.py: 30%
103 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
1#
2# SPDX-License-Identifier: GPL-3.0-or-later
3#
4# Copyright (c) 2019-2024, AllWorldIT
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
19"""BirdPlan commandline options for OSPF interface show."""
21import argparse
22import io
23from typing import Any, Dict, List
25from ..... import BirdPlanOSPFInterfaceStatus
26from .....cmdline import BirdPlanCommandLine, BirdPlanCommandlineResult
27from .....console.colors import colored
28from ...cmdline_plugin import BirdPlanCmdlinePluginBase
30__all__ = ["BirdPlanCmdlineOSPFInterfaceShow"]
33class BirdPlanCmdlineOSPFInterfaceShowResult(BirdPlanCommandlineResult):
34 """BirdPlan OSPF interface show result class."""
36 def as_text(self) -> str: # noqa: CFQ001 # pylint: disable=too-many-locals,too-many-branches,too-many-statements
37 """
38 Return data in text format.
40 Returns
41 -------
42 str
43 Return data in text format.
45 """
47 ob = io.StringIO()
49 ob.write("OSPF interface overrides:\n")
50 ob.write("-------------------------\n")
51 # Loop with sorted override list
52 if "areas" in self.data["overrides"]:
53 for area_name, area in sorted(self.data["overrides"]["areas"].items()):
54 # Skip if we have no interfaces
55 if ("interfaces" not in area) or (not area["interfaces"]):
56 continue
57 # Print out override
58 ob.write(" Area: " + colored(area_name, "cyan") + "\n")
59 for interface_name, interface in area["interfaces"].items():
60 ob.write(" Interface: " + colored(interface_name, "cyan") + "\n")
61 # Check if we have a cost override
62 if "cost" in interface:
63 ob.write(f" - Cost.......: {interface['cost']}\n")
64 # Check if we have a cost override
65 if "ecmp_weight" in interface:
66 ob.write(f" - ECMP Weight: {interface['ecmp_weight']}\n")
67 ob.write("\n")
68 # If we have no overrides, just print out --none--
69 if ("areas" not in self.data["overrides"]) or (not self.data["overrides"]["areas"]):
70 ob.write("--none--\n")
71 ob.write("\n")
73 # Get a list of all areas we know about
74 areas_all = list(self.data["current"]["areas"].keys()) + list(self.data["pending"]["areas"].keys())
75 areas_all = sorted(set(areas_all))
77 # Work out the lists of unique interfaces per area
78 interfaces_all: Dict[str, List[str]] = {}
79 for area_name in areas_all:
80 # Initailize this areas interface list
81 interfaces_all[area_name] = []
82 # Check if we have interfaces in the current configuration
83 if area_name in self.data["current"]["areas"] and "interfaces" in self.data["current"]["areas"][area_name]:
84 interfaces_all[area_name] += self.data["current"]["areas"][area_name]["interfaces"].keys()
85 # Check if we have interfaces in the pending configuration
86 if area_name in self.data["pending"]["areas"] and "interfaces" in self.data["pending"]["areas"][area_name]:
87 interfaces_all[area_name] += self.data["pending"]["areas"][area_name]["interfaces"].keys()
88 # Make sure the interface list is unique and sorted
89 interfaces_all[area_name] = sorted(set(interfaces_all[area_name]))
91 ob.write("OSPF interface status:\n")
92 ob.write("----------------------\n")
94 # Loop with areas
95 for area_name, interface_list in sorted(interfaces_all.items()):
96 # Create area section
97 ob.write(" Area: " + colored(area_name, "cyan") + "\n")
99 # Loop with sorted interface list
100 for interface in interface_list:
101 # Create interface section
102 ob.write(" Interface: " + colored(interface, "cyan") + "\n")
104 # Check if we have pending values
105 pending_cost = None
106 pending_ecmp_weight = None
107 if (
108 area_name in self.data["pending"]["areas"]
109 and interface in self.data["pending"]["areas"][area_name]["interfaces"]
110 ):
111 # Make things easier
112 pending_interface = self.data["pending"]["areas"][area_name]["interfaces"][interface]
114 pending_cost = pending_interface["cost"]
115 pending_ecmp_weight = pending_interface["ecmp_weight"]
117 # Short circuit if we don't have anything pending
118 if pending_cost is None and pending_ecmp_weight is None:
119 ob.write(" " + colored("-removed-", "magenta") + "\n")
120 continue
122 # Grab current cost status
123 current_cost = None
124 current_ecmp_weight = None
125 if (
126 area_name in self.data["current"]["areas"]
127 and interface in self.data["current"]["areas"][area_name]["interfaces"]
128 ):
129 # Make things easier below
130 current_interface = self.data["current"]["areas"][area_name]["interfaces"][interface]
131 # Check if we have a current cost and ECMP weight
132 if "cost" in current_interface: # noqa: SIM908
133 current_cost = current_interface["cost"]
134 if "ecmp_weight" in current_interface: # noqa: SIM908
135 current_ecmp_weight = current_interface["ecmp_weight"]
137 # Work out the cost string
138 cost_str = None
139 if (current_cost is not None) and (pending_cost is not None):
140 cost_str = f"{current_cost} → "
141 if current_cost != pending_cost:
142 cost_str += colored(f"{pending_cost}", "yellow")
143 else:
144 cost_str += f"{pending_cost}"
145 else:
146 cost_str = colored(f"{pending_cost}", "green")
147 # Work out the ECMP weight
148 ecmp_weight_str = None
149 if (current_ecmp_weight is not None) and (pending_ecmp_weight is not None):
150 ecmp_weight_str = f"{current_ecmp_weight} → "
151 if current_ecmp_weight != pending_ecmp_weight:
152 ecmp_weight_str += colored(f"{pending_ecmp_weight}", "yellow")
153 else:
154 ecmp_weight_str += f"{pending_ecmp_weight}"
155 else:
156 ecmp_weight_str = colored(f"{pending_ecmp_weight}", "green")
158 ob.write(f" - Cost.......: {cost_str}\n")
159 ob.write(f" - ECMP Weight: {ecmp_weight_str}\n")
160 # Separate areas
161 ob.write("\n")
163 # Add newline to end of output
164 ob.write(colored("NEW", "green") + " - New interface configuration\n")
165 ob.write(colored("CHANGED", "yellow") + " - Pending interface configuration\n")
166 ob.write("\n")
168 return ob.getvalue()
171class BirdPlanCmdlineOSPFInterfaceShow(BirdPlanCmdlinePluginBase):
172 """BirdPlan "ospf interface show" command."""
174 def __init__(self) -> None:
175 """Initialize object."""
177 super().__init__()
179 # Plugin setup
180 self.plugin_description = "birdplan ospf interface show"
181 self.plugin_order = 30
183 def register_parsers(self, args: Dict[str, Any]) -> None:
184 """
185 Register commandline parsers.
187 Parameters
188 ----------
189 args : Dict[str, Any]
190 Method argument(s).
192 """
194 plugins = args["plugins"]
196 parent_subparsers = plugins.call_plugin("birdplan.plugins.cmdline.ospf.interface", "get_subparsers", {})
198 # CMD: ospf interface show
199 subparser = parent_subparsers.add_parser("show", help="Show OSPF interfaces")
200 subparser.add_argument(
201 "--action",
202 action="store_const",
203 const="ospf_interface_show",
204 default="ospf_interface_show",
205 help=argparse.SUPPRESS,
206 )
207 subparser.add_argument(
208 "area",
209 nargs="?",
210 metavar="AREA",
211 help="Optional area in which the interface is in",
212 )
213 subparser.add_argument(
214 "interface",
215 nargs="?",
216 metavar="IFACE",
217 help="Optional interface",
218 )
220 # Set our internal subparser property
221 self._subparser = subparser
222 self._subparsers = None
224 def cmd_ospf_interface_show(self, args: Any) -> Any:
225 """
226 Commandline handler for "ospf interface show" action.
228 Parameters
229 ----------
230 args : Dict[str, Any]
231 Method argument(s).
233 """
235 if not self._subparser: # pragma: no cover
236 raise RuntimeError()
238 cmdline: BirdPlanCommandLine = args["cmdline"]
240 # Suppress info output
241 cmdline.birdplan.birdconf.birdconfig_globals.suppress_info = True
243 # Load BirdPlan configuration using the cache
244 cmdline.birdplan_load_config(ignore_irr_changes=True, ignore_peeringdb_changes=True, use_cached=True)
246 # Grab peer list
247 res: BirdPlanOSPFInterfaceStatus = cmdline.birdplan.state_ospf_interface_status()
249 return BirdPlanCmdlineOSPFInterfaceShowResult(res)