Coverage for flogin/plugin.py: 59%

229 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-03 22:51 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import json 

5import logging 

6import os 

7from collections.abc import Awaitable, Callable, Coroutine, Iterable 

8from pathlib import Path 

9from typing import ( 

10 TYPE_CHECKING, 

11 Any, 

12 ClassVar, 

13 Generic, 

14 Literal, 

15 TypeVar, 

16 TypeVarTuple, 

17 cast, 

18 overload, 

19) 

20 

21from .default_events import get_default_events 

22from .errors import EnvNotSet, PluginNotInitialized 

23from .flow import FlowLauncherAPI, FlowSettings, PluginMetadata 

24from .jsonrpc import ( 

25 ErrorResponse, 

26 ExecuteResponse, 

27 JsonRPCClient, 

28 QueryResponse, 

29 Result, 

30) 

31from .search_handler import SearchHandler 

32from .settings import Settings 

33from .utils import ( 

34 MISSING, 

35 add_classmethod_alt, 

36 cached_property, 

37 coro_or_gen, 

38 decorator, 

39 func_with_self, 

40 setup_logging, 

41) 

42 

43if TYPE_CHECKING: 

44 import re 

45 from asyncio.streams import StreamReader, StreamWriter 

46 

47 from typing_extensions import TypeVar # noqa: TC004 

48 

49 from ._types.search_handlers import ( 

50 ConvertableToResult, 

51 SearchHandlerCallback, 

52 SearchHandlerCallbackReturns, 

53 SearchHandlerCallbackWithSelf, 

54 SearchHandlerCondition, 

55 ) 

56 from .query import Query 

57 

58 SettingsT = TypeVar("SettingsT", default=Settings, bound=Settings) 

59else: 

60 SettingsT = TypeVar("SettingsT") 

61 

62TS = TypeVarTuple("TS") 

63EventCallbackT = TypeVar( 

64 "EventCallbackT", bound=Callable[..., Coroutine[Any, Any, Any]] 

65) 

66log = logging.getLogger(__name__) 

67 

68__all__ = ("Plugin",) 

69 

70 

71class Plugin(Generic[SettingsT]): 

72 r"""This class represents your plugin. 

73 

74 Parameters 

75 ----------- 

76 settings_no_update: Optional[:class:`bool`] 

77 Whether or not to let flow update flogin's version of the settings. This can be useful when using a custom settings menu. Defaults to ``False`` 

78 ignore_cancellation_requests: Optional[:class:`bool`] 

79 Whether or not to ignore cancellation requests sent from flow. Defaults to ``False`` 

80 disable_log_override_files: Optional[:class:`bool`] 

81 Whether or not to disable the log override files. Defaults to ``False``. See :doc:`log-override-files` for more information on this. 

82 """ 

83 

84 __class_events__: ClassVar[list[str]] = [] 

85 __class_search_handlers__: ClassVar[list[SearchHandler]] = [] 

86 

87 def __init__(self, **options: Any) -> None: 

88 self.options = options 

89 self._metadata: PluginMetadata | None = None 

90 self._search_handlers: list[SearchHandler] = [] 

91 self._results: dict[str, Result] = {} 

92 self._settings_are_populated: bool = False 

93 self._last_query: Query | None = None 

94 

95 self._events: dict[str, Callable[..., Awaitable[Any]]] = get_default_events( 

96 self 

97 ) 

98 self.jsonrpc: JsonRPCClient = JsonRPCClient(self) 

99 

100 for event_name in self.__class_events__: 

101 self.register_event(getattr(self, event_name)) 

102 for handler in self.__class_search_handlers__: 

103 setattr(handler.callback, "owner", self) 

104 self.register_search_handler(handler) 

105 

106 self.check_for_log_override_files() 

107 

108 def check_for_log_override_files(self) -> bool | None: 

109 """None=No-Changes, True=In-Prod, False=In-Debug""" 

110 

111 if self.options.get("disable_log_override_files"): 

112 return 

113 

114 from .utils import _logging_formatter_status 

115 

116 dir = Path(os.getcwd()) 

117 

118 for _ in dir.glob("*.flogin.debug"): 

119 if _logging_formatter_status is None: 

120 setup_logging() 

121 log.info("Debug file found, logs are now enabled.") 

122 return False 

123 

124 for _ in dir.glob("*.flogin.prod"): 

125 if _logging_formatter_status is not None: 

126 log.info("Prod file found. Logs are being disabled") 

127 logger, handler = _logging_formatter_status 

128 handler.close() 

129 logger.removeHandler(handler) 

130 return True 

131 

132 @property 

133 def last_query(self) -> Query | None: 

134 """:class:`~flogin.query.Query` | ``None``: The last query request that flow sent. This is ``None`` if no query request has been sent yet.""" 

135 return self._last_query 

136 

137 @cached_property 

138 def api(self) -> FlowLauncherAPI: 

139 """:class:`~flogin.flow.api.FlowLauncherAPI`: An easy way to acess Flow Launcher's API""" 

140 

141 return FlowLauncherAPI(self.jsonrpc) 

142 

143 def _get_env(self, name: str, alternative: str | None = None) -> str: 

144 try: 

145 return os.environ[name] 

146 except KeyError: 

147 raise EnvNotSet(name, alternative) from None 

148 

149 @cached_property 

150 def flow_version(self) -> str: 

151 """:class:`str`: the flow version from environment variables. 

152 

153 .. versionadded:: 1.0.1 

154 

155 Raises 

156 ------ 

157 :class:`~flogin.errors.EnvNotSet` 

158 This is raised when the environment variable for this property is not set by flow or the plugin tester. 

159 """ 

160 

161 return self._get_env("FLOW_VERSION", "flow_version") 

162 

163 @cached_property 

164 def flow_application_dir(self) -> Path: 

165 """:class:`~pathlib.Path`: flow's application directory from environment variables. 

166 

167 .. versionadded:: 1.0.1 

168 

169 Raises 

170 ------ 

171 :class:`~flogin.errors.EnvNotSet` 

172 This is raised when the environment variable for this property is not set by flow or the plugin tester. 

173 """ 

174 

175 return Path(self._get_env("FLOW_APPLICATION_DIRECTORY", "flow_application_dir")) 

176 

177 @cached_property 

178 def flow_program_dir(self) -> Path: 

179 """:class:`~pathlib.Path`: flow's application program from environment variables. 

180 

181 .. versionadded:: 1.0.1 

182 

183 Raises 

184 ------ 

185 :class:`~flogin.errors.EnvNotSet` 

186 This is raised when the environment variable for this property is not set by flow or the plugin tester. 

187 """ 

188 

189 return Path(self._get_env("FLOW_PROGRAM_DIRECTORY", "flow_program_dir")) 

190 

191 @cached_property 

192 def settings(self) -> SettingsT: 

193 """:class:`~flogin.settings.Settings`: The plugin's settings set by the user""" 

194 

195 fp = os.path.join( 

196 "..", "..", "Settings", "Plugins", self.metadata.name, "Settings.json" 

197 ) 

198 with open(fp) as f: 

199 data = json.load(f) 

200 self._settings_are_populated = True 

201 log.debug("Settings filled from file: %r", data) 

202 return Settings(data, no_update=self.options.get("settings_no_update", False)) # type: ignore[reportReturnType] 

203 

204 async def _run_event( 

205 self, 

206 coro: Callable[..., Awaitable[Any]], 

207 event_name: str, 

208 args: Iterable[Any], 

209 kwargs: dict[str, Any], 

210 error_handler: Callable[[Exception], Coroutine[Any, Any, Any]] | str = MISSING, 

211 ) -> Any: 

212 try: 

213 return await coro(*args, **kwargs) 

214 except asyncio.CancelledError: 

215 pass 

216 except Exception as e: 

217 if error_handler is MISSING: 

218 error_handler = "on_error" 

219 if isinstance(error_handler, str): 

220 return await self._events[error_handler](event_name, e, *args, **kwargs) 

221 return await error_handler(e) 

222 

223 def _schedule_event( 

224 self, 

225 coro: Callable[..., Awaitable[Any]], 

226 event_name: str, 

227 args: Iterable[Any] = MISSING, 

228 kwargs: dict[str, Any] = MISSING, 

229 error_handler: Callable[[Exception], Coroutine[Any, Any, Any]] | str = MISSING, 

230 ) -> asyncio.Task[Any]: 

231 wrapped = self._run_event( 

232 coro, event_name, args or [], kwargs or {}, error_handler 

233 ) 

234 return asyncio.create_task(wrapped, name=f"flogin: {event_name}") 

235 

236 def dispatch( 

237 self, event: str, *args: Any, **kwargs: Any 

238 ) -> None | asyncio.Task[Any]: 

239 method = f"on_{event}" 

240 

241 # Special Event Cases 

242 replacements = { 

243 "on_initialize": "_initialize_wrapper", 

244 } 

245 method = replacements.get(method, method) 

246 

247 log.debug("Dispatching event %s", method) 

248 

249 event_callback = self._events.get(method) 

250 if event_callback: 

251 return self._schedule_event(event_callback, method, args, kwargs) 

252 

253 async def _coro_or_gen_to_results( 

254 self, coro: SearchHandlerCallbackReturns 

255 ) -> list[Result] | ErrorResponse: 

256 results: list[Result] = [] 

257 raw_results = await coro_or_gen(coro) 

258 

259 if raw_results is None: 

260 return results 

261 

262 if isinstance(raw_results, ErrorResponse): 

263 return raw_results 

264 if not isinstance(raw_results, list): 

265 raw_results = [raw_results] 

266 for raw_res in cast("list[ConvertableToResult]", raw_results): 

267 res = Result.from_anything(raw_res) 

268 self._results[res.slug] = res 

269 results.append(res) 

270 return results 

271 

272 async def _initialize_wrapper(self, arg: dict[str, Any]) -> ExecuteResponse: 

273 log.debug("Initialize: %r", arg) 

274 self._metadata = PluginMetadata(arg["currentPluginMetadata"], self.api) 

275 self.dispatch("initialization") 

276 return ExecuteResponse(hide=False) 

277 

278 async def process_context_menus( 

279 self, data: list[Any] 

280 ) -> QueryResponse | ErrorResponse: 

281 log.debug("Context Menu Handler: data=%r", data) 

282 

283 results: list[Result] 

284 

285 if not data: 

286 results = [] 

287 else: 

288 result = self._results.get(data[0]) 

289 

290 if result is not None: 

291 result.plugin = self 

292 task = self._schedule_event( 

293 self._coro_or_gen_to_results, 

294 event_name=f"ContextMenu-{result.slug}", 

295 args=[result.context_menu()], 

296 error_handler=lambda e: self._coro_or_gen_to_results( 

297 result.on_context_menu_error(e) 

298 ), 

299 ) 

300 results = await task 

301 else: 

302 results = [] 

303 

304 if isinstance(results, ErrorResponse): 

305 return results 

306 return QueryResponse(results, self.settings._get_updates()) 

307 

308 async def process_search_handlers( 

309 self, query: Query 

310 ) -> QueryResponse | ErrorResponse: 

311 results: list[Result] = [] 

312 for handler in self._search_handlers: 

313 handler.plugin = self 

314 if handler.condition(query): 

315 task = self._schedule_event( 

316 self._coro_or_gen_to_results, 

317 event_name=f"SearchHandler-{handler.name}", 

318 args=[handler.callback(query)], 

319 error_handler=lambda e: self._coro_or_gen_to_results( 

320 handler.on_error(query, e) 

321 ), 

322 ) 

323 results = await task 

324 break 

325 

326 if isinstance(results, ErrorResponse): 

327 return results 

328 return QueryResponse(results, self.settings._get_updates()) 

329 

330 async def _action_callback_wrapper( 

331 self, callback: Callable[[], Awaitable[ExecuteResponse | bool | None]] 

332 ) -> ExecuteResponse: 

333 value = await callback() 

334 if isinstance(value, bool): 

335 return ExecuteResponse(hide=value) 

336 if value is None: 

337 return ExecuteResponse() 

338 return value 

339 

340 def process_action(self, method: str) -> asyncio.Task[ExecuteResponse] | None: 

341 slug = method.removeprefix("flogin.action.") 

342 result = self._results.get(slug) 

343 if result is None: 

344 return 

345 

346 result.plugin = self 

347 return self._schedule_event( 

348 self._action_callback_wrapper, 

349 method, 

350 args=[result.callback], 

351 error_handler=result.on_error, 

352 ) 

353 

354 @property 

355 def metadata(self) -> PluginMetadata: 

356 """ 

357 Returns the plugin's metadata. 

358 

359 Raises 

360 -------- 

361 :class:`~flogin.errors.PluginNotInitialized` 

362 This gets raised if the plugin hasn't been initialized yet 

363 """ 

364 if self._metadata: 

365 return self._metadata 

366 raise PluginNotInitialized() 

367 

368 async def start(self) -> None: 

369 r"""|coro| 

370 

371 The default startup/setup method. This can be overriden for advanced startup behavior, but make sure to run ``await super().start()`` to actually start your plugin. 

372 """ 

373 

374 import aioconsole 

375 

376 get_standard_streams: Awaitable[tuple[StreamReader, StreamWriter]] = ( 

377 aioconsole.get_standard_streams() # type: ignore 

378 ) 

379 

380 reader, writer = await get_standard_streams 

381 

382 await self.jsonrpc.start_listening(reader, writer) 

383 

384 def run(self, *, setup_default_log_handler: bool = True) -> None: 

385 r"""The default runner. This runs the :func:`~flogin.plugin.Plugin.start` coroutine, and setups up logging. 

386 

387 Parameters 

388 -------- 

389 setup_default_log_handler: :class:`bool` 

390 Whether to setup the default log handler or not, defaults to `True`. 

391 """ 

392 

393 log_status = self.check_for_log_override_files() 

394 

395 if log_status is None and setup_default_log_handler: 

396 setup_logging() 

397 

398 try: 

399 asyncio.run(self.start()) 

400 except Exception as e: 

401 log.exception("A fatal error has occurred which crashed flogin", exc_info=e) 

402 

403 def register_search_handler(self, handler: SearchHandler[Any]) -> None: 

404 r"""Register a new search handler 

405 

406 See the :ref:`search handler section <search_handlers>` for more information about using search handlers. 

407 

408 Parameters 

409 ----------- 

410 handler: :class:`~flogin.search_handler.SearchHandler` 

411 The search handler to be registered 

412 """ 

413 

414 self._search_handlers.append(handler) 

415 log.debug("Registered search handler: %r", handler) 

416 

417 def register_search_handlers(self, *handlers: SearchHandler[Any]) -> None: 

418 r"""Register new search handlers 

419 

420 See the :ref:`search handler section <search_handlers>` for more information about using search handlers. 

421 

422 Parameters 

423 ----------- 

424 *handlers: list[:class:`~flogin.search_handler.SearchHandler`] 

425 The search handlers to be registered 

426 """ 

427 

428 for handler in handlers: 

429 self.register_search_handler(handler) 

430 

431 def register_event( 

432 self, callback: Callable[..., Coroutine[Any, Any, Any]], name: str | None = None 

433 ) -> None: 

434 """Registers an event to listen for. See the :func:`~flogin.plugin.Plugin.event` decorator for another method of registering events. 

435 

436 All events must be a :ref:`coroutine <coroutine>`. 

437 

438 .. NOTE:: 

439 See the :ref:`event reference <events>` to see what valid events there are. 

440 

441 Parameters 

442 ----------- 

443 callback: :ref:`coroutine <coroutine>` 

444 The :ref:`coroutine <coroutine>` to be executed with the event 

445 name: Optional[:class:`str`] 

446 The name of the event to be registered. Defaults to the callback's name. 

447 """ 

448 

449 self._events[name or callback.__name__] = callback 

450 

451 @classmethod 

452 def __event_classmethod_deco(cls, callback: EventCallbackT) -> EventCallbackT: 

453 cls.__class_events__.append(callback.__name__) 

454 return callback 

455 

456 @decorator 

457 @add_classmethod_alt(__event_classmethod_deco) 

458 def event(self, callback: EventCallbackT) -> EventCallbackT: 

459 """A decorator that registers an event to listen for. This decorator can be used with a plugin instance or as a classmethod. 

460 

461 All events must be a :ref:`coroutine <coroutine>`. 

462 

463 .. versionchanged:: 1.1.0 

464 The decorator can now be used as a classmethod 

465 

466 .. NOTE:: 

467 See the :ref:`event reference <events>` to see what valid events there are. 

468 

469 Example 

470 --------- 

471 

472 With a plugin instance: 

473 

474 .. code-block:: python3 

475 

476 from flogin.utils import print 

477 

478 @plugin.event 

479 async def on_initialization(): 

480 print('Ready!') 

481 

482 As a classmethod: 

483 

484 .. code-block:: python3 

485 

486 from flogin.utils import print 

487 

488 class MyPlugin(Plugin): 

489 @Plugin.event 

490 async def on_initialization(self): 

491 print('Ready!') 

492 """ 

493 

494 self.register_event(callback) 

495 return callback 

496 

497 @overload 

498 @classmethod 

499 def __handle_search_deco( 

500 cls, 

501 registrate: Callable[[SearchHandler], None], 

502 *, 

503 add_self: Literal[True], 

504 condition: SearchHandlerCondition | None = None, 

505 **kwargs: Any, 

506 ) -> Callable[[SearchHandlerCallbackWithSelf], SearchHandler]: ... 

507 

508 @overload 

509 @classmethod 

510 def __handle_search_deco( 

511 cls, 

512 registrate: Callable[[SearchHandler], None], 

513 *, 

514 add_self: Literal[False], 

515 condition: SearchHandlerCondition | None = None, 

516 **kwargs: Any, 

517 ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... 

518 

519 @classmethod 

520 def __handle_search_deco( 

521 cls, 

522 registrate: Callable[[SearchHandler], None], 

523 *, 

524 add_self: bool, 

525 condition: SearchHandlerCondition | None = None, 

526 **kwargs: Any, 

527 ) -> ( 

528 Callable[[SearchHandlerCallbackWithSelf], SearchHandler] 

529 | Callable[[SearchHandlerCallback], SearchHandler] 

530 ): 

531 if condition is None: 

532 condition = SearchHandler._builtin_condition_kwarg_to_obj(**kwargs) 

533 

534 def inner( 

535 func: SearchHandlerCallbackWithSelf | SearchHandlerCallback, 

536 ) -> SearchHandler: 

537 handler = SearchHandler() 

538 if condition: 

539 setattr(handler, "condition", condition) 

540 if add_self: 

541 func = func_with_self(func) # type: ignore[reportArgumentType] 

542 setattr(handler, "callback", func) 

543 registrate(handler) 

544 return handler 

545 

546 return inner 

547 

548 @classmethod 

549 def __search_classmethod_deco( 

550 cls, 

551 condition: SearchHandlerCondition | None = None, 

552 *, 

553 text: str = MISSING, 

554 pattern: re.Pattern[str] | str = MISSING, 

555 keyword: str = MISSING, 

556 allowed_keywords: Iterable[str] = MISSING, 

557 disallowed_keywords: Iterable[str] = MISSING, 

558 ) -> Callable[[SearchHandlerCallbackWithSelf], SearchHandler]: 

559 return cls.__handle_search_deco( 

560 cls.__class_search_handlers__.append, 

561 add_self=True, 

562 condition=condition, 

563 text=text, 

564 pattern=pattern, 

565 keyword=keyword, 

566 allowed_keywords=allowed_keywords, 

567 disallowed_keywords=disallowed_keywords, 

568 ) 

569 

570 @decorator 

571 @add_classmethod_alt(__search_classmethod_deco) 

572 def search( 

573 self, 

574 condition: SearchHandlerCondition | None = None, 

575 *, 

576 text: str = MISSING, 

577 pattern: re.Pattern[str] | str = MISSING, 

578 keyword: str = MISSING, 

579 allowed_keywords: Iterable[str] = MISSING, 

580 disallowed_keywords: Iterable[str] = MISSING, 

581 ) -> Callable[[SearchHandlerCallback], SearchHandler]: 

582 """A decorator that registers a search handler. 

583 

584 All search handlers must be a :ref:`coroutine <coroutine>`. See the :ref:`search handler section <search_handlers>` for more information about using search handlers. 

585 

586 .. versionchanged:: 2.0.0 

587 The search decorator can now be used as a classmethod 

588 

589 .. NOTE:: 

590 This decorator can also be used as a classmethod 

591 

592 Parameters 

593 ---------- 

594 condition: Optional[:ref:`condition <condition_example>`] 

595 The condition to determine which queries this handler should run on. If given, this should be the only argument given. 

596 text: Optional[:class:`str`] 

597 A kwarg to quickly add a :class:`~flogin.conditions.PlainTextCondition`. If given, this should be the only argument given. 

598 pattern: Optional[:class:`re.Pattern` | :class:`str`] 

599 A kwarg to quickly add a :class:`~flogin.conditions.RegexCondition`. If given, this should be the only argument given. 

600 keyword: Optional[:class:`str`] 

601 A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the ``keyword`` kwarg being the only allowed keyword. 

602 allowed_keywords: Optional[Iterable[:class:`str`]] 

603 A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the kwarg being the list of allowed keywords. 

604 disallowed_keywords: Optional[Iterable[:class:`str`]] 

605 A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the kwarg being the list of disallowed keywords. 

606 

607 Example 

608 --------- 

609 

610 With an instance: 

611 

612 .. code-block:: python3 

613 

614 @plugin.search() 

615 async def example_search_handler(data: Query): 

616 return "This is a result!" 

617 

618 As a classmethod: 

619 

620 .. code-block:: python3 

621 

622 class MyPlugin(Plugin): 

623 @Plugin.search() 

624 async def example_search_handler(self, data: Query): 

625 return "This is a result!" 

626 """ 

627 

628 return self.__handle_search_deco( 

629 self.register_search_handler, 

630 add_self=False, 

631 condition=condition, 

632 text=text, 

633 pattern=pattern, 

634 keyword=keyword, 

635 allowed_keywords=allowed_keywords, 

636 disallowed_keywords=disallowed_keywords, 

637 ) 

638 

639 def fetch_flow_settings(self) -> FlowSettings: 

640 """Fetches flow's settings from flow's config file 

641 

642 Returns 

643 -------- 

644 :class:`~flogin.flow.settings.FlowSettings` 

645 A dataclass containing all of flow's settings 

646 """ 

647 

648 path = os.path.join("..", "..", "Settings", "Settings.json") 

649 with open(path) as f: 

650 data = json.load(f) 

651 return FlowSettings(data)