Coverage for birdplan/cmdline.py: 62%
177 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:27 +0000
1#
2# SPDX-License-Identifier: GPL-3.0-or-later
3#
4# Copyright (c) 2019-2024, AllWorldIT
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
19"""BirdPlan commandline interface."""
21import argparse
22import copy
23import json
24import logging
25import logging.handlers
26import os
27import sys
28from typing import Any, Callable, Dict, List, Literal, NoReturn, Optional
30from . import BirdPlan
31from .console.colors import colored
32from .exceptions import BirdPlanError, BirdPlanErrorUsage
33from .plugin import PluginCollection
34from .version import __version__
36__all__ = ["ColorFormatter", "BirdPlanArgumentParser", "BirdPlanCommandLine"]
39# Defaults
40if os.path.exists("/etc/bird/bird.conf"):
41 BIRD_CONFIG_FILE = "/etc/bird/bird.conf"
42else:
43 BIRD_CONFIG_FILE = "/etc/bird.conf"
44BIRD_SOCKET = "/run/bird/bird.ctl"
45BIRDPLAN_FILE = "/etc/birdplan/birdplan.yaml"
46BIRDPLAN_STATE_FILE = "/var/lib/birdplan/birdplan.state"
47BIRDPLAN_MONITOR_FILE = "/var/lib/birdplan/monitor.json"
50TRACE_LOG_LEVEL = 5
53class ColorFormatter(logging.Formatter):
54 """
55 A custom log formatter class that.
57 It currently...
58 * Outputs the LOG_LEVEL with an appropriate color.
59 * If a log call includes an `extras={"color_message": ...}` it will be used for formatting the output, instead of the plain
60 text message.
61 """
63 level_name_colors: Dict[int, Callable[[str], str]] = {
64 TRACE_LOG_LEVEL: lambda level_name: colored(str(level_name), "blue"),
65 logging.DEBUG: lambda level_name: colored(str(level_name), "cyan"),
66 logging.INFO: lambda level_name: colored(str(level_name), "green"),
67 logging.WARNING: lambda level_name: colored(str(level_name), "yellow"),
68 logging.ERROR: lambda level_name: colored(str(level_name), "red"),
69 logging.CRITICAL: lambda level_name: colored(str(level_name), "bright_red"),
70 }
72 def __init__(
73 self,
74 fmt: str | None = None,
75 datefmt: str | None = None,
76 style: Literal["%", "{", "$"] = "%",
77 use_colors: bool | None = None,
78 ):
79 """
80 Color log formatter class.
82 Arguments
83 ---------
84 fmt : str
85 Format string.
86 datefmt : str
87 Date format string.
88 style : str
89 Format style.
90 use_colors : bool
91 Use colors or not.
92 """
94 if isinstance(use_colors, bool):
95 self.use_colors = use_colors
96 else:
97 self.use_colors = sys.stdout.isatty()
98 super().__init__(fmt=fmt, datefmt=datefmt, style=style)
100 def color_level_name(self, level_name: str, level_no: int) -> str:
101 """Get colored level name from level_no."""
103 def default(level_name: str) -> str:
104 return str(level_name) # pragma: no cover
106 func = self.level_name_colors.get(level_no, default)
107 return func(level_name)
109 def should_use_colors(self) -> bool:
110 """Return if we should use colors or not."""
111 return True # pragma: no cover
113 def formatMessage(self, record: logging.LogRecord) -> str: # noqa: N802
114 """Format a message from a record."""
116 # Copy record
117 recordcopy = copy.copy(record)
118 # Grab level name
119 levelname = recordcopy.levelname
120 # Add padding before color control codes
121 seperator = " " * (8 - len(recordcopy.levelname))
122 # Check if we're using color or not
123 if self.use_colors:
124 # If we are get the levelname in color
125 levelname = self.color_level_name(levelname, recordcopy.levelno)
126 # If a color_message is present, use it instead
127 if "color_message" in recordcopy.__dict__:
128 recordcopy.msg = recordcopy.__dict__["color_message"]
129 recordcopy.__dict__["message"] = recordcopy.getMessage()
130 # Set the record levelcolor
131 recordcopy.__dict__["levelcolor"] = levelname + seperator
132 return super().formatMessage(recordcopy)
135class BirdPlanArgumentParser(argparse.ArgumentParser):
136 """ArgumentParser override class to output errors slightly better."""
138 def error(self, message: str) -> NoReturn:
139 """
140 Slightly better error message handler for ArgumentParser.
142 Argument
143 --------
144 message : str
145 Error message.
147 """
148 raise BirdPlanErrorUsage(message, self)
151class BirdPlanCommandlineResult: # pylint: disable=too-few-public-methods
152 """BirdPlan commandline result class."""
154 _data: Any
155 _has_console_output: bool
157 def __init__(self, data: Any, has_console_output: bool = True) -> None:
158 """Initialize object."""
160 self._data = data
161 self._has_console_output = has_console_output
163 def as_console(self) -> str:
164 """
165 Return data as console output.
167 Returns
168 -------
169 str
170 Data as console output.
171 """
173 return self.as_text()
175 def as_text(self) -> str:
176 """
177 Return data as text.
179 Returns
180 -------
181 str
182 Data as text.
183 """
185 return f"{self.data}"
187 def as_json(self) -> str:
188 """
189 Return data as JSON.
191 Parameters
192 ----------
193 data : Any
194 Output data.
196 Returns
197 -------
198 str
199 Return data as JSON.
200 """
202 return json.dumps({"status": "success", "data": self.data})
204 @property
205 def data(self) -> Any:
206 """
207 Return raw data.
209 Returns
210 -------
211 Any
212 Return data in its raw form.
213 """
215 return self._data
217 @property
218 def has_console_output(self) -> bool:
219 """
220 Return whether or not data has console output.
222 Returns
223 -------
224 bool
225 Return whether or not data has console output.
226 """
228 return self._has_console_output
231class BirdPlanCommandLine:
232 """BirdPlan commandline handling class."""
234 _is_console: bool
235 _args: argparse.Namespace
236 _argparser: BirdPlanArgumentParser
237 _birdplan: BirdPlan
239 def __init__(self, test_mode: bool = False, is_console: bool = False) -> None:
240 """Instantiate object."""
242 prog: Optional[str] = None
243 if sys.argv[0].endswith("__main__.py"):
244 prog = "python -m birdplan"
246 self._is_console = is_console
247 self._args = argparse.Namespace()
248 self._argparser = BirdPlanArgumentParser(add_help=False, prog=prog)
249 self._birdplan = BirdPlan(test_mode=test_mode)
251 def run( # noqa: CFQ001 # pylint: disable=too-many-branches,too-many-locals,too-many-statements
252 self, raw_args: Optional[List[str]] = None
253 ) -> BirdPlanCommandlineResult:
254 """Run BirdPlan from command line."""
256 # Check if we have one commandline argument, if we do and if it is --version, return our version
257 if raw_args and len(raw_args) == 1 and raw_args[0] == "--version":
258 result: BirdPlanCommandlineResult = BirdPlanCommandlineResult(__version__)
259 # Check if we're on a console
260 if self.is_console:
261 # Check if we should output json
262 if self.is_json:
263 print(result.as_json())
264 # Else if we have console output, then output that
265 elif result.has_console_output:
266 print(result.as_console())
268 return result
270 # If this is the console, display our version
271 if self.is_console:
272 print(f"BirdPlan v{__version__} - Copyright © 2019-2024, AllWorldIT.\n", file=sys.stderr)
274 # Add main commandline arguments
275 optional_group = self.argparser.add_argument_group("Optional arguments")
276 optional_group.add_argument("-h", "--help", action="help", help="Show this help message and exit")
277 optional_group.add_argument("-v", "--verbose", action="store_true", help="Display verbose logging")
278 optional_group.add_argument("--version", action="store_true", help="Display version and exit")
280 # Input and output file settings
281 optional_group.add_argument(
282 "-b",
283 "--bird-socket",
284 nargs=1,
285 metavar="BIRD_SOCKET",
286 default=[BIRD_SOCKET],
287 help=f"BirdPlan needs access to the Bird control socket for some commandline functionality (default: {BIRD_SOCKET})",
288 )
289 optional_group.add_argument(
290 "-i",
291 "--birdplan-file",
292 nargs=1,
293 metavar="BIRDPLAN_FILE",
294 default=[BIRDPLAN_FILE],
295 help=f"BirdPlan file to process (default: {BIRDPLAN_FILE})",
296 )
297 optional_group.add_argument(
298 "-s",
299 "--birdplan-state-file",
300 nargs=1,
301 metavar="BIRDPLAN_STATE_FILE",
302 default=[None],
303 help=f"BirdPlan state file to use (default: {BIRDPLAN_STATE_FILE})",
304 )
305 optional_group.add_argument(
306 "-n",
307 "--no-write-state",
308 action="store_true",
309 default=False,
310 help="Disable writing state file",
311 )
312 optional_group.add_argument(
313 "-j",
314 "--json",
315 action="store_true",
316 default=False,
317 help="Output in JSON",
318 )
320 # Add subparsers
321 subparsers = self.argparser.add_subparsers()
323 # configure
324 plugins = PluginCollection(["birdplan.plugins.cmdline"])
326 # Register commandline parsers
327 plugins.call_if_exists("register_parsers", {"root_parser": subparsers, "plugins": plugins})
329 # Parse commandline args
330 self._args = self.argparser.parse_args(raw_args)
332 # Setup logging
333 if self.is_console:
334 self._setup_logging()
336 # Make sure we have an action
337 if "action" not in self.args:
338 raise BirdPlanErrorUsage("No action specified", self.argparser)
340 # Generate the command line option method name
341 method_name = f"cmd_{self.args.action}"
343 # Grab the first plugin which has this method
344 plugin_name = plugins.get_first(method_name)
345 if not plugin_name:
346 raise BirdPlanError("Failed to find plugin to handle command line options")
348 # Grab the result from the command
349 result = plugins.call_plugin(plugin_name, method_name, {"cmdline": self})
351 # Check if we're on a console
352 if self.is_console:
353 # Check if we should output json
354 if self.is_json:
355 print(result.as_json())
356 # Else if we have console output, then output that
357 elif result.has_console_output:
358 print(result.as_console())
360 return result
362 def birdplan_load_config(self, **kwargs: Any) -> None:
363 """
364 Load BirdPlan configuration.
366 Parameters
367 ----------
368 ignore_irr_changes : bool
369 Optional parameter to ignore IRR lookups during configuration load.
371 ignore_peeringdb_changes : bool
372 Optional parameter to ignore peering DB lookups during configuraiton load.
374 use_cached : bool
375 Optional parameter to use cached values from state during configuration load.
377 """
379 # Set the state file
380 state_file: Optional[str] = None
381 if self.args.birdplan_state_file[0]:
382 state_file = self.args.birdplan_state_file[0]
383 else:
384 state_file = BIRDPLAN_STATE_FILE
386 # Try load configuration
387 self.birdplan.load(
388 plan_file=self.args.birdplan_file[0],
389 state_file=state_file,
390 **kwargs,
391 )
393 def birdplan_commit_state(self) -> None:
394 """Commit BirdPlan state."""
396 # Check if we need to skip writing the state
397 if self.args.no_write_state:
398 return
400 self.birdplan.commit_state()
402 def _setup_logging(self) -> None:
403 """Set up logging."""
405 # Setup logger and level
406 logger = logging.getLogger()
407 if self.args.verbose:
408 logger.setLevel(logging.DEBUG)
409 else:
410 logger.setLevel(logging.INFO)
412 # Remove all existing handlers
413 for handler in logger.handlers[:]:
414 logger.removeHandler(handler)
416 # Setup console handler
417 console_handler = logging.StreamHandler(sys.stderr)
418 # Build log format
419 log_format = ""
420 # Only add a timestamp if we're not running under systemd
421 if "INVOCATION_ID" not in os.environ:
422 log_format += "%(asctime)s "
423 log_format += "%(levelcolor)s %(message)s"
424 # Use a better format for messages
425 console_handler.setFormatter(ColorFormatter(log_format, "[%Y-%m-%d %H:%M:%S]"))
426 logger.addHandler(console_handler)
428 @property
429 def args(self) -> argparse.Namespace:
430 """Return our commandline arguments."""
431 return self._args
433 @property
434 def argparser(self) -> argparse.ArgumentParser:
435 """Return our ArgumentParser instance."""
436 return self._argparser
438 @property
439 def birdplan(self) -> BirdPlan:
440 """Return our BirdPlan instance."""
441 return self._birdplan
443 @property
444 def is_console(self) -> bool:
445 """
446 Property indicating True or False if we're being called from the commandline.
448 Returns
449 -------
450 bool indicating if we were called from the commandline.
452 """
453 return self._is_console
455 @is_console.setter
456 def is_console(self, is_console: bool) -> None:
457 """
458 Set property indicating we're running from a console.
460 Parameters
461 ----------
462 is_console : bool
463 Set to True indicating if we were called from the commandline.
465 """
466 self._is_console = is_console
468 @property
469 def is_json(self) -> bool:
470 """
471 Property indicating that we should output in JSON on the commandline.
473 Returns
474 -------
475 bool : indicating if we should output in JSON from the commandline.
477 """
479 if ("--json" in sys.argv) or ("-j" in sys.argv):
480 return True
481 return False
484# Main entry point from the commandline
485def main() -> None:
486 """Entry point function for the commandline."""
487 birdplan_cmdline = BirdPlanCommandLine(is_console=True)
489 try:
490 birdplan_cmdline.run(sys.argv[1:])
491 except BirdPlanError as exception:
492 if birdplan_cmdline.is_json:
493 print(json.dumps({"status": "error", "message": str(exception)}))
494 else:
495 print(f"ERROR: {exception}", file=sys.stderr)
496 sys.exit(1)
498 except BirdPlanErrorUsage as exception:
499 if birdplan_cmdline.is_json:
500 print(json.dumps({"status": "error", "message": exception.message}))
501 else:
502 print(f"ERROR: {exception}", file=sys.stderr)
503 sys.exit(2)