Coverage for birdplan/cmdline.py: 62%

177 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 03:27 +0000

1# 

2# SPDX-License-Identifier: GPL-3.0-or-later 

3# 

4# Copyright (c) 2019-2024, AllWorldIT 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation, either version 3 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License 

17# along with this program. If not, see <http://www.gnu.org/licenses/>. 

18 

19"""BirdPlan commandline interface.""" 

20 

21import argparse 

22import copy 

23import json 

24import logging 

25import logging.handlers 

26import os 

27import sys 

28from typing import Any, Callable, Dict, List, Literal, NoReturn, Optional 

29 

30from . import BirdPlan 

31from .console.colors import colored 

32from .exceptions import BirdPlanError, BirdPlanErrorUsage 

33from .plugin import PluginCollection 

34from .version import __version__ 

35 

36__all__ = ["ColorFormatter", "BirdPlanArgumentParser", "BirdPlanCommandLine"] 

37 

38 

39# Defaults 

40if os.path.exists("/etc/bird/bird.conf"): 

41 BIRD_CONFIG_FILE = "/etc/bird/bird.conf" 

42else: 

43 BIRD_CONFIG_FILE = "/etc/bird.conf" 

44BIRD_SOCKET = "/run/bird/bird.ctl" 

45BIRDPLAN_FILE = "/etc/birdplan/birdplan.yaml" 

46BIRDPLAN_STATE_FILE = "/var/lib/birdplan/birdplan.state" 

47BIRDPLAN_MONITOR_FILE = "/var/lib/birdplan/monitor.json" 

48 

49 

50TRACE_LOG_LEVEL = 5 

51 

52 

53class ColorFormatter(logging.Formatter): 

54 """ 

55 A custom log formatter class that. 

56 

57 It currently... 

58 * Outputs the LOG_LEVEL with an appropriate color. 

59 * If a log call includes an `extras={"color_message": ...}` it will be used for formatting the output, instead of the plain 

60 text message. 

61 """ 

62 

63 level_name_colors: Dict[int, Callable[[str], str]] = { 

64 TRACE_LOG_LEVEL: lambda level_name: colored(str(level_name), "blue"), 

65 logging.DEBUG: lambda level_name: colored(str(level_name), "cyan"), 

66 logging.INFO: lambda level_name: colored(str(level_name), "green"), 

67 logging.WARNING: lambda level_name: colored(str(level_name), "yellow"), 

68 logging.ERROR: lambda level_name: colored(str(level_name), "red"), 

69 logging.CRITICAL: lambda level_name: colored(str(level_name), "bright_red"), 

70 } 

71 

72 def __init__( 

73 self, 

74 fmt: str | None = None, 

75 datefmt: str | None = None, 

76 style: Literal["%", "{", "$"] = "%", 

77 use_colors: bool | None = None, 

78 ): 

79 """ 

80 Color log formatter class. 

81 

82 Arguments 

83 --------- 

84 fmt : str 

85 Format string. 

86 datefmt : str 

87 Date format string. 

88 style : str 

89 Format style. 

90 use_colors : bool 

91 Use colors or not. 

92 """ 

93 

94 if isinstance(use_colors, bool): 

95 self.use_colors = use_colors 

96 else: 

97 self.use_colors = sys.stdout.isatty() 

98 super().__init__(fmt=fmt, datefmt=datefmt, style=style) 

99 

100 def color_level_name(self, level_name: str, level_no: int) -> str: 

101 """Get colored level name from level_no.""" 

102 

103 def default(level_name: str) -> str: 

104 return str(level_name) # pragma: no cover 

105 

106 func = self.level_name_colors.get(level_no, default) 

107 return func(level_name) 

108 

109 def should_use_colors(self) -> bool: 

110 """Return if we should use colors or not.""" 

111 return True # pragma: no cover 

112 

113 def formatMessage(self, record: logging.LogRecord) -> str: # noqa: N802 

114 """Format a message from a record.""" 

115 

116 # Copy record 

117 recordcopy = copy.copy(record) 

118 # Grab level name 

119 levelname = recordcopy.levelname 

120 # Add padding before color control codes 

121 seperator = " " * (8 - len(recordcopy.levelname)) 

122 # Check if we're using color or not 

123 if self.use_colors: 

124 # If we are get the levelname in color 

125 levelname = self.color_level_name(levelname, recordcopy.levelno) 

126 # If a color_message is present, use it instead 

127 if "color_message" in recordcopy.__dict__: 

128 recordcopy.msg = recordcopy.__dict__["color_message"] 

129 recordcopy.__dict__["message"] = recordcopy.getMessage() 

130 # Set the record levelcolor 

131 recordcopy.__dict__["levelcolor"] = levelname + seperator 

132 return super().formatMessage(recordcopy) 

133 

134 

135class BirdPlanArgumentParser(argparse.ArgumentParser): 

136 """ArgumentParser override class to output errors slightly better.""" 

137 

138 def error(self, message: str) -> NoReturn: 

139 """ 

140 Slightly better error message handler for ArgumentParser. 

141 

142 Argument 

143 -------- 

144 message : str 

145 Error message. 

146 

147 """ 

148 raise BirdPlanErrorUsage(message, self) 

149 

150 

151class BirdPlanCommandlineResult: # pylint: disable=too-few-public-methods 

152 """BirdPlan commandline result class.""" 

153 

154 _data: Any 

155 _has_console_output: bool 

156 

157 def __init__(self, data: Any, has_console_output: bool = True) -> None: 

158 """Initialize object.""" 

159 

160 self._data = data 

161 self._has_console_output = has_console_output 

162 

163 def as_console(self) -> str: 

164 """ 

165 Return data as console output. 

166 

167 Returns 

168 ------- 

169 str 

170 Data as console output. 

171 """ 

172 

173 return self.as_text() 

174 

175 def as_text(self) -> str: 

176 """ 

177 Return data as text. 

178 

179 Returns 

180 ------- 

181 str 

182 Data as text. 

183 """ 

184 

185 return f"{self.data}" 

186 

187 def as_json(self) -> str: 

188 """ 

189 Return data as JSON. 

190 

191 Parameters 

192 ---------- 

193 data : Any 

194 Output data. 

195 

196 Returns 

197 ------- 

198 str 

199 Return data as JSON. 

200 """ 

201 

202 return json.dumps({"status": "success", "data": self.data}) 

203 

204 @property 

205 def data(self) -> Any: 

206 """ 

207 Return raw data. 

208 

209 Returns 

210 ------- 

211 Any 

212 Return data in its raw form. 

213 """ 

214 

215 return self._data 

216 

217 @property 

218 def has_console_output(self) -> bool: 

219 """ 

220 Return whether or not data has console output. 

221 

222 Returns 

223 ------- 

224 bool 

225 Return whether or not data has console output. 

226 """ 

227 

228 return self._has_console_output 

229 

230 

231class BirdPlanCommandLine: 

232 """BirdPlan commandline handling class.""" 

233 

234 _is_console: bool 

235 _args: argparse.Namespace 

236 _argparser: BirdPlanArgumentParser 

237 _birdplan: BirdPlan 

238 

239 def __init__(self, test_mode: bool = False, is_console: bool = False) -> None: 

240 """Instantiate object.""" 

241 

242 prog: Optional[str] = None 

243 if sys.argv[0].endswith("__main__.py"): 

244 prog = "python -m birdplan" 

245 

246 self._is_console = is_console 

247 self._args = argparse.Namespace() 

248 self._argparser = BirdPlanArgumentParser(add_help=False, prog=prog) 

249 self._birdplan = BirdPlan(test_mode=test_mode) 

250 

251 def run( # noqa: CFQ001 # pylint: disable=too-many-branches,too-many-locals,too-many-statements 

252 self, raw_args: Optional[List[str]] = None 

253 ) -> BirdPlanCommandlineResult: 

254 """Run BirdPlan from command line.""" 

255 

256 # Check if we have one commandline argument, if we do and if it is --version, return our version 

257 if raw_args and len(raw_args) == 1 and raw_args[0] == "--version": 

258 result: BirdPlanCommandlineResult = BirdPlanCommandlineResult(__version__) 

259 # Check if we're on a console 

260 if self.is_console: 

261 # Check if we should output json 

262 if self.is_json: 

263 print(result.as_json()) 

264 # Else if we have console output, then output that 

265 elif result.has_console_output: 

266 print(result.as_console()) 

267 

268 return result 

269 

270 # If this is the console, display our version 

271 if self.is_console: 

272 print(f"BirdPlan v{__version__} - Copyright © 2019-2024, AllWorldIT.\n", file=sys.stderr) 

273 

274 # Add main commandline arguments 

275 optional_group = self.argparser.add_argument_group("Optional arguments") 

276 optional_group.add_argument("-h", "--help", action="help", help="Show this help message and exit") 

277 optional_group.add_argument("-v", "--verbose", action="store_true", help="Display verbose logging") 

278 optional_group.add_argument("--version", action="store_true", help="Display version and exit") 

279 

280 # Input and output file settings 

281 optional_group.add_argument( 

282 "-b", 

283 "--bird-socket", 

284 nargs=1, 

285 metavar="BIRD_SOCKET", 

286 default=[BIRD_SOCKET], 

287 help=f"BirdPlan needs access to the Bird control socket for some commandline functionality (default: {BIRD_SOCKET})", 

288 ) 

289 optional_group.add_argument( 

290 "-i", 

291 "--birdplan-file", 

292 nargs=1, 

293 metavar="BIRDPLAN_FILE", 

294 default=[BIRDPLAN_FILE], 

295 help=f"BirdPlan file to process (default: {BIRDPLAN_FILE})", 

296 ) 

297 optional_group.add_argument( 

298 "-s", 

299 "--birdplan-state-file", 

300 nargs=1, 

301 metavar="BIRDPLAN_STATE_FILE", 

302 default=[None], 

303 help=f"BirdPlan state file to use (default: {BIRDPLAN_STATE_FILE})", 

304 ) 

305 optional_group.add_argument( 

306 "-n", 

307 "--no-write-state", 

308 action="store_true", 

309 default=False, 

310 help="Disable writing state file", 

311 ) 

312 optional_group.add_argument( 

313 "-j", 

314 "--json", 

315 action="store_true", 

316 default=False, 

317 help="Output in JSON", 

318 ) 

319 

320 # Add subparsers 

321 subparsers = self.argparser.add_subparsers() 

322 

323 # configure 

324 plugins = PluginCollection(["birdplan.plugins.cmdline"]) 

325 

326 # Register commandline parsers 

327 plugins.call_if_exists("register_parsers", {"root_parser": subparsers, "plugins": plugins}) 

328 

329 # Parse commandline args 

330 self._args = self.argparser.parse_args(raw_args) 

331 

332 # Setup logging 

333 if self.is_console: 

334 self._setup_logging() 

335 

336 # Make sure we have an action 

337 if "action" not in self.args: 

338 raise BirdPlanErrorUsage("No action specified", self.argparser) 

339 

340 # Generate the command line option method name 

341 method_name = f"cmd_{self.args.action}" 

342 

343 # Grab the first plugin which has this method 

344 plugin_name = plugins.get_first(method_name) 

345 if not plugin_name: 

346 raise BirdPlanError("Failed to find plugin to handle command line options") 

347 

348 # Grab the result from the command 

349 result = plugins.call_plugin(plugin_name, method_name, {"cmdline": self}) 

350 

351 # Check if we're on a console 

352 if self.is_console: 

353 # Check if we should output json 

354 if self.is_json: 

355 print(result.as_json()) 

356 # Else if we have console output, then output that 

357 elif result.has_console_output: 

358 print(result.as_console()) 

359 

360 return result 

361 

362 def birdplan_load_config(self, **kwargs: Any) -> None: 

363 """ 

364 Load BirdPlan configuration. 

365 

366 Parameters 

367 ---------- 

368 ignore_irr_changes : bool 

369 Optional parameter to ignore IRR lookups during configuration load. 

370 

371 ignore_peeringdb_changes : bool 

372 Optional parameter to ignore peering DB lookups during configuraiton load. 

373 

374 use_cached : bool 

375 Optional parameter to use cached values from state during configuration load. 

376 

377 """ 

378 

379 # Set the state file 

380 state_file: Optional[str] = None 

381 if self.args.birdplan_state_file[0]: 

382 state_file = self.args.birdplan_state_file[0] 

383 else: 

384 state_file = BIRDPLAN_STATE_FILE 

385 

386 # Try load configuration 

387 self.birdplan.load( 

388 plan_file=self.args.birdplan_file[0], 

389 state_file=state_file, 

390 **kwargs, 

391 ) 

392 

393 def birdplan_commit_state(self) -> None: 

394 """Commit BirdPlan state.""" 

395 

396 # Check if we need to skip writing the state 

397 if self.args.no_write_state: 

398 return 

399 

400 self.birdplan.commit_state() 

401 

402 def _setup_logging(self) -> None: 

403 """Set up logging.""" 

404 

405 # Setup logger and level 

406 logger = logging.getLogger() 

407 if self.args.verbose: 

408 logger.setLevel(logging.DEBUG) 

409 else: 

410 logger.setLevel(logging.INFO) 

411 

412 # Remove all existing handlers 

413 for handler in logger.handlers[:]: 

414 logger.removeHandler(handler) 

415 

416 # Setup console handler 

417 console_handler = logging.StreamHandler(sys.stderr) 

418 # Build log format 

419 log_format = "" 

420 # Only add a timestamp if we're not running under systemd 

421 if "INVOCATION_ID" not in os.environ: 

422 log_format += "%(asctime)s " 

423 log_format += "%(levelcolor)s %(message)s" 

424 # Use a better format for messages 

425 console_handler.setFormatter(ColorFormatter(log_format, "[%Y-%m-%d %H:%M:%S]")) 

426 logger.addHandler(console_handler) 

427 

428 @property 

429 def args(self) -> argparse.Namespace: 

430 """Return our commandline arguments.""" 

431 return self._args 

432 

433 @property 

434 def argparser(self) -> argparse.ArgumentParser: 

435 """Return our ArgumentParser instance.""" 

436 return self._argparser 

437 

438 @property 

439 def birdplan(self) -> BirdPlan: 

440 """Return our BirdPlan instance.""" 

441 return self._birdplan 

442 

443 @property 

444 def is_console(self) -> bool: 

445 """ 

446 Property indicating True or False if we're being called from the commandline. 

447 

448 Returns 

449 ------- 

450 bool indicating if we were called from the commandline. 

451 

452 """ 

453 return self._is_console 

454 

455 @is_console.setter 

456 def is_console(self, is_console: bool) -> None: 

457 """ 

458 Set property indicating we're running from a console. 

459 

460 Parameters 

461 ---------- 

462 is_console : bool 

463 Set to True indicating if we were called from the commandline. 

464 

465 """ 

466 self._is_console = is_console 

467 

468 @property 

469 def is_json(self) -> bool: 

470 """ 

471 Property indicating that we should output in JSON on the commandline. 

472 

473 Returns 

474 ------- 

475 bool : indicating if we should output in JSON from the commandline. 

476 

477 """ 

478 

479 if ("--json" in sys.argv) or ("-j" in sys.argv): 

480 return True 

481 return False 

482 

483 

484# Main entry point from the commandline 

485def main() -> None: 

486 """Entry point function for the commandline.""" 

487 birdplan_cmdline = BirdPlanCommandLine(is_console=True) 

488 

489 try: 

490 birdplan_cmdline.run(sys.argv[1:]) 

491 except BirdPlanError as exception: 

492 if birdplan_cmdline.is_json: 

493 print(json.dumps({"status": "error", "message": str(exception)})) 

494 else: 

495 print(f"ERROR: {exception}", file=sys.stderr) 

496 sys.exit(1) 

497 

498 except BirdPlanErrorUsage as exception: 

499 if birdplan_cmdline.is_json: 

500 print(json.dumps({"status": "error", "message": exception.message})) 

501 else: 

502 print(f"ERROR: {exception}", file=sys.stderr) 

503 sys.exit(2)