Coverage for flogin/utils.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-03 22:51 +0000

1from __future__ import annotations 

2 

3import logging 

4import logging.handlers 

5from collections.abc import ( 

6 AsyncGenerator, 

7 AsyncIterable, 

8 Awaitable, 

9 Callable, 

10 Coroutine, 

11) 

12from functools import update_wrapper, wraps 

13from inspect import isasyncgen, iscoroutine 

14from inspect import signature as _signature 

15from inspect import stack as _stack 

16from typing import ( 

17 TYPE_CHECKING, 

18 Any, 

19 Concatenate, 

20 Generic, 

21 Literal, 

22 NamedTuple, 

23 ParamSpec, 

24 TypeVar, 

25 overload, 

26) 

27 

28Coro = TypeVar("Coro", bound=Callable[..., Coroutine[Any, Any, Any]]) 

29AGenT = TypeVar("AGenT", bound=Callable[..., AsyncGenerator[Any, Any]]) 

30T = TypeVar("T") 

31 

32 

33class _cached_property(Generic[T]): 

34 def __init__(self, function: Callable[..., T]) -> None: 

35 self.function = function 

36 self.__doc__ = getattr(function, "__doc__") 

37 

38 def __get__(self, instance: object | None, owner: type[object]) -> Any: 

39 if instance is None: 

40 return self 

41 

42 value = self.function(instance) 

43 setattr(instance, self.function.__name__, value) 

44 

45 return value 

46 

47 

48if TYPE_CHECKING: 

49 from functools import cached_property as cached_property 

50else: 

51 cached_property = _cached_property 

52 

53__all__ = ("MISSING", "coro_or_gen", "print", "setup_logging") 

54 

55 

56def copy_doc(original: Callable[..., Any]) -> Callable[[T], T]: 

57 def decorator(overridden: T) -> T: 

58 overridden.__doc__ = original.__doc__ 

59 setattr(overridden, "__sigature__", _signature(original)) 

60 return overridden 

61 

62 return decorator 

63 

64 

65class _MissingSentinel: 

66 """A type safe sentinel used in the library to represent something as missing. Used to distinguish from ``None`` values.""" 

67 

68 def __bool__(self) -> bool: 

69 return False 

70 

71 def __eq__(self, other: Any) -> bool: 

72 return False 

73 

74 def __repr__(self) -> str: 

75 return "..." 

76 

77 

78MISSING: Any = _MissingSentinel() 

79 

80_logging_formatter_status: tuple[logging.Logger, logging.Handler] | None = None 

81 

82 

83def setup_logging( 

84 *, 

85 formatter: logging.Formatter | None = None, 

86 handler: logging.Handler | None = None, 

87 logger: logging.Logger | None = None, 

88) -> tuple[logging.Logger, logging.Handler]: 

89 r"""Sets up flogin's default logger. 

90 

91 .. versionchanged:: 2.0.0 

92 :func:`setup_logging` now returns tuple[:class:`logging.Logger`, :class:`logging.Handler`] 

93 

94 Parameters 

95 ---------- 

96 formatter: Optional[:class:`logging.Formatter`] 

97 The formatter to use, incase you don't want to use the default file formatter. 

98 handler: Optional[:class:`logging.Handler`] 

99 The handler object that should be added to the logger. Defaults to :class:`logging.handlers.RotatingFileHandler` with the following arguments: 

100 

101 .. code-block:: py3 

102 

103 filename="flogin.log", maxBytes=1000000, encoding="UTF-8", backupCount=1 

104 

105 .. versionadded:: 2.0.0 

106 logger: Optional[:class:`logging.Logger`] 

107 The logger object that the handler/formatter should be added to. 

108 

109 .. versionadded:: 2.0.0 

110 

111 Returns 

112 ------- 

113 tuple[:class:`logging.Logger`, :class:`logging.Handler`] 

114 The logger and handler used to setup the logs. 

115 """ 

116 

117 level = logging.DEBUG 

118 

119 if handler is None: 

120 handler = logging.handlers.RotatingFileHandler( 

121 filename="flogin.log", maxBytes=1000000, encoding="UTF-8", backupCount=1 

122 ) 

123 

124 if formatter is None: 

125 dt_fmt = "%Y-%m-%d %H:%M:%S" 

126 formatter = logging.Formatter( 

127 "[{asctime}] [{levelname:<8}] {name}: {message}", dt_fmt, style="{" 

128 ) 

129 

130 if logger is None: 

131 logger = logging.getLogger() 

132 

133 handler.setFormatter(formatter) 

134 logger.setLevel(level) 

135 logger.addHandler(handler) 

136 

137 global _logging_formatter_status 

138 _logging_formatter_status = logger, handler 

139 return _logging_formatter_status 

140 

141 

142async def coro_or_gen(coro: Awaitable[T] | AsyncIterable[T]) -> list[T] | T: 

143 """|coro| 

144 

145 Executes an AsyncIterable or a Coroutine, and returns the result 

146 

147 Parameters 

148 ----------- 

149 coro: :class:`typing.Awaitable` | :class:`typing.AsyncIterable` 

150 The coroutine or asynciterable to be ran 

151 

152 Raises 

153 -------- 

154 TypeError 

155 Neither a :class:`typing.Coroutine` or an :class:`typing.AsyncIterable` was passed 

156 

157 Returns 

158 -------- 

159 Any 

160 Whatever was given from the :class:`typing.Coroutine` or :class:`typing.AsyncIterable`. 

161 """ 

162 

163 if iscoroutine(coro): 

164 return await coro 

165 if isasyncgen(coro): 

166 return [item async for item in coro] 

167 raise TypeError(f"Not a coro or gen: {coro!r}") 

168 

169 

170ReleaseLevel = Literal["alpha", "beta", "candidate", "final"] 

171 

172 

173class VersionInfo(NamedTuple): 

174 major: int 

175 minor: int 

176 micro: int 

177 releaselevel: ReleaseLevel 

178 

179 @classmethod 

180 def _from_str(cls, txt: str) -> VersionInfo: 

181 raw_major, raw_minor, raw_micro_w_rel = txt.split(".") 

182 

183 rlevel_shorthands: dict[str, ReleaseLevel] = { 

184 "a": "alpha", 

185 "b": "beta", 

186 "c": "candidate", 

187 } 

188 release_level = rlevel_shorthands.get(raw_micro_w_rel[-1], "final") 

189 

190 if release_level != "final": 

191 raw_micro = raw_micro_w_rel.removesuffix(raw_micro_w_rel[-1]) 

192 else: 

193 raw_micro = raw_micro_w_rel 

194 

195 try: 

196 major = int(raw_major) 

197 except ValueError: 

198 raise ValueError( 

199 f"Invalid major version, {raw_major!r} is not a valid integer" 

200 ) from None 

201 try: 

202 minor = int(raw_minor) 

203 except ValueError: 

204 raise ValueError( 

205 f"Invalid minor version, {raw_minor!r} is not a valid integer" 

206 ) from None 

207 try: 

208 micro = int(raw_micro) 

209 except ValueError: 

210 raise ValueError( 

211 f"Invalid micro version, {raw_micro!r} is not a valid integer" 

212 ) from None 

213 

214 return cls(major=major, minor=minor, micro=micro, releaselevel=release_level) 

215 

216 

217OwnerT = TypeVar("OwnerT") 

218# Instance signature 

219P = ParamSpec("P") 

220ReturnT = TypeVar("ReturnT") 

221InstanceMethodT = Callable[Concatenate[OwnerT, P], ReturnT] 

222# classmethod signature 

223PC = ParamSpec("PC") 

224ReturnCT = TypeVar("ReturnCT") 

225ClassMethodT = Callable[Concatenate[type[OwnerT], P], ReturnT] 

226 

227 

228class InstanceOrClassmethod(Generic[OwnerT, P, ReturnT, PC, ReturnCT]): 

229 def __init__( 

230 self, 

231 instance_func: InstanceMethodT[OwnerT, P, ReturnT], 

232 classmethod_func: ClassMethodT[OwnerT, PC, ReturnCT], 

233 ) -> None: 

234 self.__instance_func__: InstanceMethodT[OwnerT, P, ReturnT] = instance_func 

235 self.__classmethod_func__: ClassMethodT[OwnerT, PC, ReturnCT] = getattr( 

236 classmethod_func, "__func__", classmethod_func 

237 ) 

238 

239 self.__doc__ = self.__instance_func__.__doc__ 

240 

241 @overload 

242 def __get__( 

243 self, instance: None, owner: type[OwnerT] 

244 ) -> Callable[PC, ReturnCT]: ... 

245 

246 @overload 

247 def __get__( 

248 self, instance: OwnerT, owner: type[OwnerT] 

249 ) -> Callable[P, ReturnT]: ... 

250 

251 def __get__(self, instance: OwnerT | None, owner: type[OwnerT]) -> Any: 

252 @wraps(self.__instance_func__) 

253 def wrapper(*args: Any, **kwargs: Any) -> ReturnCT | ReturnT: 

254 if instance is not None: 

255 return self.__instance_func__(instance, *args, **kwargs) 

256 return self.__classmethod_func__(owner, *args, **kwargs) 

257 

258 return wrapper 

259 

260 

261def add_classmethod_alt( 

262 classmethod_func: ClassMethodT[OwnerT, PC, ReturnCT], 

263) -> Callable[ 

264 [InstanceMethodT[OwnerT, P, ReturnT]], 

265 InstanceOrClassmethod[OwnerT, P, ReturnT, PC, ReturnCT], 

266]: 

267 def decorator( 

268 instance_func: InstanceMethodT[OwnerT, P, ReturnT], 

269 ) -> InstanceOrClassmethod[OwnerT, P, ReturnT, PC, ReturnCT]: 

270 return InstanceOrClassmethod(instance_func, classmethod_func) 

271 

272 return decorator 

273 

274 

275def print(*values: object, sep: str = MISSING, name: str = MISSING) -> None: 

276 r"""A function that acts similar to the `builtin print function <https://docs.python.org/3/library/functions.html#print>`__, but uses the `logging <https://docs.python.org/3/library/logging.html#module-logging>`__ module instead. 

277 

278 This helper function is provided to easily "print" text without having to setup a logging object, because the builtin print function does not work as expected due to the jsonrpc pipes. 

279 

280 .. versionadded:: 1.1.0 

281 .. versionchanged:: 2.0.0 

282 The default log name now defaults to the filepath of the file that called the function opposed to ``printing``. 

283 

284 Parameters 

285 ----------- 

286 \*values: :class:`object` 

287 A list of values to print 

288 sep: Optional[:class:`str`] 

289 The character that is used as the seperator between the values. Defaults to a space. 

290 name: Optional[:class:`str`] 

291 The name of the logger. Defaults to the filepath of the file the function is called from. 

292 

293 .. versionadded:: 2.0.0 

294 """ 

295 

296 if sep is MISSING: 

297 sep = " " 

298 if name is MISSING: 

299 name = _stack()[1].filename 

300 

301 logging.getLogger(name).info(sep.join(str(val) for val in values)) 

302 

303 

304def decorator(deco: T) -> T: 

305 setattr(deco, "__is_decorator__", True) 

306 return deco 

307 

308 

309class func_with_self(Generic[P, ReturnT, OwnerT]): 

310 def __init__(self, func: Callable[Concatenate[OwnerT, P], ReturnT]) -> None: 

311 self.func = func 

312 self.owner: OwnerT | None = None 

313 

314 update_wrapper(wrapped=func, wrapper=self) 

315 

316 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> ReturnT: 

317 if self.owner is None: 

318 raise RuntimeError("Owner has not been set") 

319 

320 return self.func(self.owner, *args, **kwargs)