Coverage for src/birdplan/cmdline.py: 68%

156 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-04 10:19 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2025, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BirdPlan commandline interface.""" 

20 

21import argparse 

22import copy 

23import json 

24import logging 

25import logging.handlers 

26import os 

27import pathlib 

28import sys 

29from collections.abc import Callable 

30from typing import Any, ClassVar, Literal, NoReturn 

31 

32from . import BirdPlan 

33from .console.colors import colored 

34from .exceptions import BirdPlanError, BirdPlanUsageError 

35from .plugin import PluginCollection 

36from .version import __version__ 

37 

38__all__ = ["BirdPlanArgumentParser", "BirdPlanCommandLine", "ColorFormatter"] 

39 

40 

41# Defaults 

42BIRD_CONFIG_FILE = "/etc/bird/bird.conf" if pathlib.Path("/etc/bird/bird.conf").exists() else "/etc/bird.conf" 

43BIRD_SOCKET = "/run/bird/bird.ctl" 

44BIRDPLAN_FILE = "/etc/birdplan/birdplan.yaml" 

45BIRDPLAN_STATE_FILE = "/var/lib/birdplan/birdplan.state" 

46BIRDPLAN_MONITOR_FILE = "/var/lib/birdplan/monitor.json" 

47 

48 

49TRACE_LOG_LEVEL = 5 

50 

51 

52class ColorFormatter(logging.Formatter): 

53 """ 

54 A custom log formatter class that. 

55 

56 It currently... 

57 * Outputs the LOG_LEVEL with an appropriate color. 

58 * If a log call includes an `extras={"color_message": ...}` it will be used for formatting the output, instead of the plain 

59 text message. 

60 """ 

61 

62 level_name_colors: ClassVar[dict[int, Callable[[str], str]]] = { 

63 TRACE_LOG_LEVEL: lambda level_name: colored(str(level_name), "blue"), 

64 logging.DEBUG: lambda level_name: colored(str(level_name), "cyan"), 

65 logging.INFO: lambda level_name: colored(str(level_name), "green"), 

66 logging.WARNING: lambda level_name: colored(str(level_name), "yellow"), 

67 logging.ERROR: lambda level_name: colored(str(level_name), "red"), 

68 logging.CRITICAL: lambda level_name: colored(str(level_name), "bright_red"), 

69 } 

70 

71 def __init__( 

72 self, 

73 fmt: str | None = None, 

74 datefmt: str | None = None, 

75 style: Literal["%", "{", "$"] = "%", 

76 use_colors: bool | None = None, 

77 ) -> None: 

78 """ 

79 Color log formatter class. 

80 

81 Arguments: 

82 --------- 

83 fmt : str 

84 Format string. 

85 datefmt : str 

86 Date format string. 

87 style : str 

88 Format style. 

89 use_colors : bool 

90 Use colors or not. 

91 

92 """ 

93 

94 if isinstance(use_colors, bool): 

95 self.use_colors = use_colors 

96 else: 

97 self.use_colors = sys.stdout.isatty() 

98 super().__init__(fmt=fmt, datefmt=datefmt, style=style) 

99 

100 def color_level_name(self, level_name: str, level_no: int) -> str: 

101 """Get colored level name from level_no.""" 

102 

103 def default(level_name: str) -> str: 

104 return str(level_name) # pragma: no cover 

105 

106 func = self.level_name_colors.get(level_no, default) 

107 return func(level_name) 

108 

109 def should_use_colors(self) -> bool: 

110 """Return if we should use colors or not.""" 

111 return True # pragma: no cover 

112 

113 def formatMessage(self, record: logging.LogRecord) -> str: # noqa: N802 

114 """Format a message from a record.""" 

115 

116 # Copy record 

117 recordcopy = copy.copy(record) 

118 # Grab level name 

119 levelname = recordcopy.levelname 

120 # Add padding before color control codes 

121 seperator = " " * (8 - len(recordcopy.levelname)) 

122 # Check if we're using color or not 

123 if self.use_colors: 

124 # If we are get the levelname in color 

125 levelname = self.color_level_name(levelname, recordcopy.levelno) 

126 # If a color_message is present, use it instead 

127 if "color_message" in recordcopy.__dict__: 

128 recordcopy.msg = recordcopy.__dict__["color_message"] 

129 recordcopy.__dict__["message"] = recordcopy.getMessage() 

130 # Set the record levelcolor 

131 recordcopy.__dict__["levelcolor"] = levelname + seperator 

132 return super().formatMessage(recordcopy) 

133 

134 

135class BirdPlanArgumentParser(argparse.ArgumentParser): 

136 """ArgumentParser override class to output errors slightly better.""" 

137 

138 def error(self, message: str) -> NoReturn: 

139 """ 

140 Slightly better error message handler for ArgumentParser. 

141 

142 Argument 

143 -------- 

144 message : str 

145 Error message. 

146 

147 """ 

148 raise BirdPlanUsageError(message, self) 

149 

150 

151class BirdPlanCommandlineResult: # pylint: disable=too-few-public-methods 

152 """BirdPlan commandline result class.""" 

153 

154 _data: Any 

155 _has_console_output: bool 

156 

157 def __init__(self, data: Any, has_console_output: bool = True) -> None: # noqa: ANN401,FBT001,FBT002 

158 """Initialize object.""" 

159 

160 self._data = data 

161 self._has_console_output = has_console_output 

162 

163 def as_console(self) -> str: 

164 """ 

165 Return data as console output. 

166 

167 Returns 

168 ------- 

169 str 

170 Data as console output. 

171 

172 """ 

173 

174 return self.as_text() 

175 

176 def as_text(self) -> str: 

177 """ 

178 Return data as text. 

179 

180 Returns 

181 ------- 

182 str 

183 Data as text. 

184 

185 """ 

186 

187 return f"{self.data}" 

188 

189 def as_json(self) -> str: 

190 """ 

191 Return data as JSON. 

192 

193 Parameters 

194 ---------- 

195 data : Any 

196 Output data. 

197 

198 Returns 

199 ------- 

200 str 

201 Return data as JSON. 

202 

203 """ 

204 

205 return json.dumps({"status": "success", "data": self.data}) 

206 

207 @property 

208 def data(self) -> Any: # noqa: ANN401 

209 """ 

210 Return raw data. 

211 

212 Returns 

213 ------- 

214 Any 

215 Return data in its raw form. 

216 

217 """ 

218 

219 return self._data 

220 

221 @property 

222 def has_console_output(self) -> bool: 

223 """ 

224 Return whether or not data has console output. 

225 

226 Returns 

227 ------- 

228 bool 

229 Return whether or not data has console output. 

230 

231 """ 

232 

233 return self._has_console_output 

234 

235 

236class BirdPlanCommandLine: 

237 """BirdPlan commandline handling class.""" 

238 

239 _is_console: bool 

240 _args: argparse.Namespace 

241 _argparser: BirdPlanArgumentParser 

242 _birdplan: BirdPlan 

243 

244 def __init__(self, test_mode: bool = False, is_console: bool = False) -> None: # noqa: FBT001,FBT002 

245 """Instantiate object.""" 

246 

247 prog: str | None = None 

248 if sys.argv[0].endswith("__main__.py"): 

249 prog = "python -m birdplan" 

250 

251 self._is_console = is_console 

252 self._args = argparse.Namespace() 

253 self._argparser = BirdPlanArgumentParser(add_help=False, prog=prog) 

254 self._birdplan = BirdPlan(test_mode=test_mode) 

255 

256 def run( # pylint: disable=too-many-branches,too-many-locals,too-many-statements 

257 self, raw_args: list[str] | None = None 

258 ) -> BirdPlanCommandlineResult: 

259 """Run BirdPlan from command line.""" 

260 

261 # Check if we have one commandline argument, if we do and if it is --version, return our version 

262 if raw_args and len(raw_args) == 1 and raw_args[0] == "--version": 

263 result: BirdPlanCommandlineResult = BirdPlanCommandlineResult(__version__) 

264 

265 return result 

266 

267 # If this is the console, display our version 

268 if self.is_console: 

269 pass 

270 

271 # Add main commandline arguments 

272 optional_group = self.argparser.add_argument_group("Optional arguments") 

273 optional_group.add_argument("-h", "--help", action="help", help="Show this help message and exit") 

274 optional_group.add_argument("-v", "--verbose", action="store_true", help="Display verbose logging") 

275 optional_group.add_argument("--version", action="store_true", help="Display version and exit") 

276 

277 # Input and output file settings 

278 optional_group.add_argument( 

279 "-b", 

280 "--bird-socket", 

281 nargs=1, 

282 metavar="BIRD_SOCKET", 

283 default=[BIRD_SOCKET], 

284 help=f"BirdPlan needs access to the Bird control socket for some commandline functionality (default: {BIRD_SOCKET})", 

285 ) 

286 optional_group.add_argument( 

287 "-i", 

288 "--birdplan-file", 

289 nargs=1, 

290 metavar="BIRDPLAN_FILE", 

291 default=[BIRDPLAN_FILE], 

292 help=f"BirdPlan file to process (default: {BIRDPLAN_FILE})", 

293 ) 

294 optional_group.add_argument( 

295 "-s", 

296 "--birdplan-state-file", 

297 nargs=1, 

298 metavar="BIRDPLAN_STATE_FILE", 

299 default=[None], 

300 help=f"BirdPlan state file to use (default: {BIRDPLAN_STATE_FILE})", 

301 ) 

302 optional_group.add_argument( 

303 "-n", 

304 "--no-write-state", 

305 action="store_true", 

306 default=False, 

307 help="Disable writing state file", 

308 ) 

309 optional_group.add_argument( 

310 "-j", 

311 "--json", 

312 action="store_true", 

313 default=False, 

314 help="Output in JSON", 

315 ) 

316 

317 # Add subparsers 

318 subparsers = self.argparser.add_subparsers() 

319 

320 # configure 

321 plugins = PluginCollection(["birdplan.plugins.cmdline"]) 

322 

323 # Register commandline parsers 

324 plugins.call_if_exists("register_parsers", {"root_parser": subparsers, "plugins": plugins}) 

325 

326 # Parse commandline args 

327 self._args = self.argparser.parse_args(raw_args) 

328 

329 # Setup logging 

330 if self.is_console: 

331 self._setup_logging() 

332 

333 # Make sure we have an action 

334 if "action" not in self.args: 

335 raise BirdPlanUsageError("No action specified", self.argparser) 

336 

337 # Generate the command line option method name 

338 method_name = f"cmd_{self.args.action}" 

339 

340 # Grab the first plugin which has this method 

341 plugin_name = plugins.get_first(method_name) 

342 if not plugin_name: 

343 raise BirdPlanError("Failed to find plugin to handle command line options") 

344 

345 # Grab the result from the command 

346 result: BirdPlanCommandlineResult = plugins.call_plugin(plugin_name, method_name, {"cmdline": self}) 

347 

348 return result 

349 

350 def birdplan_load_config(self, **kwargs: dict[str, Any]) -> None: # noqa: D417 

351 """ 

352 Load BirdPlan configuration. 

353 

354 Parameters 

355 ---------- 

356 ignore_irr_changes : bool 

357 Optional parameter to ignore IRR lookups during configuration load. 

358 

359 ignore_peeringdb_changes : bool 

360 Optional parameter to ignore peering DB lookups during configuraiton load. 

361 

362 use_cached : bool 

363 Optional parameter to use cached values from state during configuration load. 

364 

365 """ 

366 

367 # Set the state file 

368 state_file: str | None = None 

369 state_file = self.args.birdplan_state_file[0] if self.args.birdplan_state_file[0] else BIRDPLAN_STATE_FILE 

370 

371 # Try load configuration 

372 self.birdplan.load( 

373 plan_file=self.args.birdplan_file[0], 

374 state_file=state_file, 

375 **kwargs, 

376 ) 

377 

378 def birdplan_commit_state(self) -> None: 

379 """Commit BirdPlan state.""" 

380 

381 # Check if we need to skip writing the state 

382 if self.args.no_write_state: 

383 return 

384 

385 self.birdplan.commit_state() 

386 

387 def _setup_logging(self) -> None: 

388 """Set up logging.""" 

389 

390 # Setup logger and level 

391 logger = logging.getLogger() 

392 if self.args.verbose: 

393 logger.setLevel(logging.DEBUG) 

394 else: 

395 logger.setLevel(logging.INFO) 

396 

397 # Remove all existing handlers 

398 for handler in logger.handlers[:]: 

399 logger.removeHandler(handler) 

400 

401 # Setup console handler 

402 console_handler = logging.StreamHandler(sys.stderr) 

403 # Build log format 

404 log_format = "" 

405 # Only add a timestamp if we're not running under systemd 

406 if "INVOCATION_ID" not in os.environ: 

407 log_format += "%(asctime)s " 

408 log_format += "%(levelcolor)s %(message)s" 

409 # Use a better format for messages 

410 console_handler.setFormatter(ColorFormatter(log_format, "[%Y-%m-%d %H:%M:%S]")) 

411 logger.addHandler(console_handler) 

412 

413 @property 

414 def args(self) -> argparse.Namespace: 

415 """Return our commandline arguments.""" 

416 return self._args 

417 

418 @property 

419 def argparser(self) -> argparse.ArgumentParser: 

420 """Return our ArgumentParser instance.""" 

421 return self._argparser 

422 

423 @property 

424 def birdplan(self) -> BirdPlan: 

425 """Return our BirdPlan instance.""" 

426 return self._birdplan 

427 

428 @property 

429 def is_console(self) -> bool: 

430 """ 

431 Property indicating True or False if we're being called from the commandline. 

432 

433 Returns 

434 ------- 

435 bool indicating if we were called from the commandline. 

436 

437 """ 

438 return self._is_console 

439 

440 @is_console.setter 

441 def is_console(self, is_console: bool) -> None: 

442 """ 

443 Set property indicating we're running from a console. 

444 

445 Parameters 

446 ---------- 

447 is_console : bool 

448 Set to True indicating if we were called from the commandline. 

449 

450 """ 

451 self._is_console = is_console 

452 

453 @property 

454 def is_json(self) -> bool: 

455 """ 

456 Property indicating that we should output in JSON on the commandline. 

457 

458 Returns 

459 ------- 

460 bool : indicating if we should output in JSON from the commandline. 

461 

462 """ 

463 

464 return bool("--json" in sys.argv or "-j" in sys.argv) 

465 

466 

467# Main entry point from the commandline 

468def main() -> None: 

469 """Entry point function for the commandline.""" 

470 birdplan_cmdline = BirdPlanCommandLine(is_console=True) 

471 

472 try: 

473 birdplan_cmdline.run(sys.argv[1:]) 

474 except BirdPlanError: 

475 if birdplan_cmdline.is_json: 

476 pass 

477 else: 

478 sys.exit(1)