diff --git a/src/wakepy/core/__init__.py b/src/wakepy/core/__init__.py index 9c68422e..69dd0e41 100644 --- a/src/wakepy/core/__init__.py +++ b/src/wakepy/core/__init__.py @@ -4,8 +4,8 @@ See the public Python API at: https://wakepy.readthedocs.io/ """ -from .activation import ActivationResult as ActivationResult -from .activation import MethodActivationResult as MethodActivationResult +from .activationresult import ActivationResult as ActivationResult +from .activationresult import MethodActivationResult as MethodActivationResult from .constants import BusType as BusType from .constants import ModeName as ModeName from .constants import PlatformName as PlatformName diff --git a/src/wakepy/core/activation.py b/src/wakepy/core/activation.py deleted file mode 100644 index 5966a568..00000000 --- a/src/wakepy/core/activation.py +++ /dev/null @@ -1,915 +0,0 @@ -"""This module defines functions which may be used in the activation and -deactivation of Modes (using Methods). - -Most important functions ------------------------- -activate_method(method:Method) -> MethodActivationResult - Activate a mode using a single Method -get_prioritized_methods - Prioritize of collection of Methods - -Most important classes ----------------------- -ActivationResult - This is something returned from mode activation task. Contains the summary - of all used methods, and whether the activation was successful or not. -MethodActivationResult - One level lower than ActivationResult. This is result from activation task - using a single Method. -""" - -from __future__ import annotations - -import datetime as dt -import sys -import typing -from dataclasses import InitVar, dataclass, field -from typing import List, Sequence, Set, Union - -from .constants import PlatformName -from .dbus import DBusAdapter -from .heartbeat import Heartbeat -from .method import Method, MethodError, MethodOutcome -from .platform import CURRENT_PLATFORM -from .strenum import StrEnum, auto - -if sys.version_info < (3, 8): # pragma: no-cover-if-py-gte-38 - from typing_extensions import Literal -else: # pragma: no-cover-if-py-lt-38 - from typing import Literal - -if typing.TYPE_CHECKING: - from typing import Optional, Tuple, Type - - from .method import MethodCls - -"""The strings in MethodsPriorityOrder are names of wakepy.Methods or the -asterisk ('*').""" -MethodsPriorityOrder = Sequence[Union[str, Set[str]]] - - -class StageName(StrEnum): - # These are stages which occur in order for each of the methods - # when using a Method for activation. - - NONE = auto() # No stage at all. - - # The stages in the activation process in order - PLATFORM_SUPPORT = auto() - REQUIREMENTS = auto() - ACTIVATION = auto() - - -StageNameValue = Literal["NONE", "PLATFORM_SUPPORT", "REQUIREMENTS", "ACTIVATION"] - - -@dataclass -class ActivationResult: - """The ActivationResult is responsible of keeping track on the possibly - successful (max 1), failed and unused methods and providing different views - on the results of the activation process. - - Parameters - --------- - results: - The MethodActivationResults to be used to fill the ActivationResult - modename: - Name of the Mode. Optional. - - Attributes - ---------- - modename: str | None - The name of the Mode. If the Mode did not have a name, the modename - is None. - success: bool - Tells is entering into a mode was successful. Note that this may be - faked with WAKEPY_FAKE_SUCCESS environment variable e.g. for testing - purposes. - real_success: bool - Tells is entering into a mode was successful. This - may not faked with WAKEPY_FAKE_SUCCESS environment variable. - failure: bool - Always opposite of `success`. Included for convenience. - active_method: str | None - The name of the the active (successful) method, if any. - - Methods - ------- - list_methods: - Get a list of the methods present in the activation process, and their - activation results. This is the higher-level interface. If you want - more control, use .query(). - query: - Lower level interface for getting the list of the methods present in - the activation process, and their activation results. If you want - easier access, use .list_methods(). - """ - - results: InitVar[Optional[List[MethodActivationResult]]] = None - # These are the retuls for each of the used wakepy.Methods, in the - # order the methods were tried (first = highest priority, last = - # lowest priority) - - modename: Optional[str] = None - """Name of the mode, if any.""" - - active_method: str | None = field(init=False) - """The name of the active (successful) method. If no methods are active, - this is None.""" - - success: bool = field(init=False) - """Tells is entering into a mode was successful. - - Note that this may be faked with WAKEPY_FAKE_SUCCESS environment - variable (for tests). See also: real_success. - """ - - real_success: bool = field(init=False) - """Tells is entering into a mode was successful. This - may not faked with WAKEPY_FAKE_SUCCESS environment variable. - """ - - failure: bool = field(init=False) - """Always opposite of `success`. Included for convenience.""" - - _method_results: List[MethodActivationResult] = field(init=False) - - def __post_init__( - self, - results: Optional[List[MethodActivationResult]] = None, - ) -> None: - self._method_results = results or [] - self.success = self._get_success() - self.failure = not self.success - self.real_success = self._get_real_success() - self.active_method = self._get_active_method() - - def list_methods( - self, - ignore_platform_fails: bool = True, - ignore_unused: bool = False, - ) -> list[MethodActivationResult]: - """Get a list of the methods present in the activation process, and - their activation results. This is the higher-level interface. If you - want more control, use .query(). The returned methods are in the order - as given in when initializing ActivationResult. If you did not create - the ActivationReult manually, the methods are in the priority order; - the highest priority methods (those which are/were tried first) are - listed first. - - Parameters - ---------- - ignore_platform_fails: - If True, ignores plaform support check fail. This is the default as - usually one is not interested in methods which are meant for other - platforms. If False, includes also platform fails. Default: True. - ignore_unused: - If True, ignores all unused / remaining methods. Default: False. - """ - - success_values = (True, False) if ignore_unused else (True, False, None) - - fail_stages = [StageName.REQUIREMENTS, StageName.ACTIVATION] - if not ignore_platform_fails: - fail_stages.insert(0, StageName.PLATFORM_SUPPORT) - - return self.query(success=success_values, fail_stages=fail_stages) - - def query( - self, - success: Sequence[bool | None] = (True, False, None), - fail_stages: Sequence[StageName | StageNameValue] = ( - StageName.PLATFORM_SUPPORT, - StageName.REQUIREMENTS, - StageName.ACTIVATION, - ), - ) -> list[MethodActivationResult]: - """Get a list of the methods present in the activation process, and - their activation results. This is the lower-level interface. If you - want easier access, use .list_methods(). The methods are in the order - as given in when initializing ActivationResult. If you did not create - the ActivationReult manually, the methods are in the priority order; - the highest priority methods (those which are/were tried first) are - listed first. - - Parameters - ---------- - success: - Controls what methods to include in the output. Options are: - True (success), False (failure) and None (method not used). If - `success = (True, False)`, returns only methods which did succeed - or failer (do not return unused methods). - fail_stages: - The fail stages to include in the output. The options are - "PLATFORM_SUPPORT", "REQUIREMENTS" and "ACTIVATION". - """ - out = [] - for res in self._method_results: - if res.success not in success: - continue - elif res.success is False and res.failure_stage not in fail_stages: - continue - elif res.success is False and res.method_name == WakepyFakeSuccess.name: - # The fake method is only listed if it was requested to be, - # used, and when it is not requested to be used, the - # res.success is False. - continue - out.append(res) - - return out - - def get_error_text(self) -> str: - """Gets information about a failure as text. In case the mode - activation was successful, returns an empty string.""" - - if self.success: - return "" - debug_info = str(self.query()) - modename = self.modename or "[unnamed mode]" - - return ( - f'Could not activate Mode "{modename}"!\n\nMethod usage results, in ' - f"order (highest priority first):\n{debug_info}" - ) - - def _get_success(self) -> bool: - for res in self._method_results: - if res.success: - return True - return False - - def _get_real_success(self) -> bool: - - for res in self._method_results: - if res.success and res.method_name != WakepyFakeSuccess.name: - return True - return False - - def _get_active_method(self) -> str | None: - methods = [res.method_name for res in self._method_results if res.success] - if not methods: - return None - elif len(methods) == 1: - return methods[0] - else: - raise ValueError( - "The ActivationResult cannot have more than one active methods! " - f"Active methods: {methods}" - ) - - -@dataclass -class MethodActivationResult: - """This class is a result from using a single Method to activate a mode.""" - - method_name: str - - # True: Using Method was successful - # False: Using Method failed - # None: Method is unused - success: bool | None - - # None if the method did not fail. Otherwise, the name of the stage where - # the method failed. - failure_stage: Optional[StageName] = None - - failure_reason: str = "" - - def __repr__(self) -> str: - error_at = " @" + self.failure_stage if self.failure_stage else "" - failure_reason = f', "{self.failure_reason}"' if self.success is False else "" - success_str = ( - "SUCCESS" if self.success else "FAIL" if self.success is False else "UNUSED" - ) - return f"({success_str}{error_at}, {self.method_name}{failure_reason})" - - -def activate_mode( - methods: list[Type[Method]], - dbus_adapter: Optional[DBusAdapter] = None, - methods_priority: Optional[MethodsPriorityOrder] = None, - modename: Optional[str] = None, -) -> Tuple[ActivationResult, Optional[Method], Optional[Heartbeat]]: - """Activates a mode defined by a collection of Methods. Only the first - Method which succeeds activation will be used, in order from highest - priority to lowest priority. - - The activation may be faked as to be successful by using the - WAKEPY_FAKE_SUCCESS environment variable. - - Parameters - ---------- - methods: - The list of Methods to be used for activating this Mode. - dbus_adapter: - Can be used to define a custom DBus adapter for processing DBus calls - in the .caniuse(), .enter_mode(), .heartbeat() and .exit_mode() of the - Method. Optional. - methods_priority: list[str | set[str]] - The priority order, which is a list of method names or asterisk - ('*'). The asterisk means "all rest methods" and may occur only - once in the priority order, and cannot be part of a set. All method - names must be unique and must be part of the `methods`. - modename: - Name of the Mode. Used for communication to user, logging and in - error messages (can be "any string" which makes sense to you). - Optional. - """ - check_methods_priority(methods_priority, methods) - - if not methods: - # Cannot activate anything as there are no methods. - return ActivationResult(modename=modename), None, None - - prioritized_methods = get_prioritized_methods(methods, methods_priority) - # The fake method is always checked first (WAKEPY_FAKE_SUCCESS) - prioritized_methods.insert(0, WakepyFakeSuccess) - - results = [] - - for methodcls in prioritized_methods: - method = methodcls(dbus_adapter=dbus_adapter) - methodresult, heartbeat = activate_method(method) - results.append(methodresult) - if methodresult.success: - break - else: - # Tried activate with all methods, but none of them succeed - return ActivationResult(results, modename=modename), None, None - - # Activation was succesful. - return ActivationResult(results, modename=modename), method, heartbeat - - -def check_methods_priority( - methods_priority: Optional[MethodsPriorityOrder], methods: List[MethodCls] -) -> None: - """Checks against `methods` that the `methods_priority` is valid. - - Parameters - ---------- - methods_priority: list[str | set[str]] - The priority order, which is a list of where items are method names, - sets of methods names or the asterisk ('*'). The asterisk means "all - rest methods" and may occur only once in the priority order, and cannot - be part of a set. All method names must be unique and must be part of - the `methods`. - methods: list[MethodCls] - The methods which the `methods_priority` is validated against. - - Raises - ------ - ValueError or TypeError if the `methods_priority` is not valid. - """ - if methods_priority is None: - return - - known_method_names = {m.name for m in methods} - known_method_names.add("*") - seen_method_names = set() - - for method_name, in_set in _iterate_methods_priority(methods_priority): - if not isinstance(method_name, str): - raise TypeError("methods_priority must be a list[str | set[str]]!") - - if in_set and method_name == "*": - raise ValueError( - "Asterisk (*) may not be a part of a set in methods_priority!" - ) - if method_name not in known_method_names: - raise ValueError( - f'Method "{method_name}" in methods_priority not in selected methods!' - ) - if method_name in seen_method_names: - if method_name != "*": - raise ValueError( - f'Duplicate method name "{method_name}" in methods_priority' - ) - else: - raise ValueError( - "The asterisk (*) can only occur once in methods_priority!" - ) - seen_method_names.add(method_name) - - -def _iterate_methods_priority( - methods_priority: Optional[MethodsPriorityOrder], -) -> typing.Iterator[Tuple[str, bool]]: - """Provides an iterator over the items in methods_priority. The items in - the iterator are (method_name, in_set) 2-tuples, where the method_name is - the method name (str) and the in_set is a boolean which is True if the - returned method_name is part of a set and False otherwise.""" - - if not methods_priority: - return - - for item in methods_priority: - if isinstance(item, set): - for method_name in item: - yield method_name, True - else: - yield item, False - - -def get_prioritized_methods_groups( - methods: List[MethodCls], methods_priority: Optional[MethodsPriorityOrder] -) -> List[Set[MethodCls]]: - """Prioritizes Methods in `methods` based on priority order defined by - `methods_priority`. This function does not validate the methods_priority in - any way; use `check_methods_priority` for validation of needed. - - Parameters - ---------- - methods: list[MethodCls] - The source list of methods. These methods are returned as prioritized - groups. - methods_priority: list[str | set[str]] - The names of the methods in `methods`. This specifies the priority - order; the order of method classes in the returned list. An asterisk - ('*') can be used to denote "all other methods". - - - Returns - ------- - method_groups: list[set[MethodCls]] - The prioritized methods. Each set in the output represents a group of - equal priority. All Methods from the input `methods` are always - included in the output - - - Example - ------- - Say there are methods MethodA, MethodB, MethodC, MethodD, MethodE, MethodF - with names "A", "B", "C", "D", "E", "F": - - >>> methods = [MethodA, MethodB, MethodC, MethodD, MethodE, MethodF] - >>> get_prioritized_methods_groups( - methods, - methods_priority=["A", "F", "*"] - ) - [ - {MethodA}, - {MethodF}, - {MethodB, MethodC, MethodD, MethodE}, - ] - - """ - - # Make this a list of sets just to make things simpler - methods_priority_sets: List[Set[str]] = [ - {item} if isinstance(item, str) else item for item in methods_priority or [] - ] - - method_dct = {m.name: m for m in methods} - - asterisk = {"*"} - asterisk_index = None - out: List[Set[MethodCls]] = [] - - for item in methods_priority_sets: - if item == asterisk: - # Save the location where to add the rest of the methods ('*') - asterisk_index = len(out) - else: # Item is a set but not `asterisk` - out.append({method_dct[name] for name in item}) - - out_flattened = {m for group in out for m in group} - rest_of_the_methods = {m for m in methods if m not in out_flattened} - - if rest_of_the_methods: - if asterisk_index is not None: - out.insert(asterisk_index, rest_of_the_methods) - else: - out.append(rest_of_the_methods) - - return out - - -def sort_methods_by_priority(methods: Set[MethodCls]) -> List[MethodCls]: - """Sorts Method classes by priority and returns a new sorted list with - Methods with highest priority first. - - The logic is: - (1) Any Methods supporting the CURRENT_PLATFORM are placed before any other - Methods (the others are not expected to work at all) - (2) Sort alphabetically by Method name, ignoring the case - """ - return sorted( - methods, - key=lambda m: ( - # Prioritize methods supporting CURRENT_PLATFORM over any others - 0 if CURRENT_PLATFORM in m.supported_platforms else 1, - m.name.lower() if m.name else "", - ), - ) - - -def get_prioritized_methods( - methods: List[MethodCls], - methods_priority: Optional[MethodsPriorityOrder] = None, -) -> List[MethodCls]: - """Take an unordered list of Methods and sort them by priority using the - methods_priority and automatic ordering. The methods_priority is used to - define groups of priority (sets of methods). The automatic ordering part is - used to order the methods *within* each priority group. In particular, all - methods supported by the current platform are placed first, and all - supported methods are then ordered alphabetically (ignoring case). - - Parameters - ---------- - methods: - The list of Methods to sort. - methods_priority: - Optional priority order, which is a list of method names (strings) or - sets of method names (sets of strings). An asterisk ('*') may be used - for "all the rest methods". None is same as ['*']. - - Returns - ------- - sorted_methods: - The input `methods` sorted with priority, highest priority first. - - Example - ------- - Assuming: Current platform is Linux. - - >>> methods = [LinuxA, LinuxB, LinuxC, MultiPlatformA, WindowsA] - >>> get_prioritized_methods( - >>> methods, - >>> methods_priority=[{"WinA", "LinuxB"}, "*"], - >>> ) - [LinuxB, WindowsA, LinuxA, LinuxC, MultiPlatformA] - - Explanation: - - WindowsA and LinuxB were given high priority, but since the current - platform is Linux, LinuxB was prioritized to be before WindowsA. - - The asterisk ('*') is converted to a set of rest of the methods: - {"LinuxA", "LinuxC", "MultiPlatformA"}, and those are then - automatically ordered. As all of them support Linux, the result is - just the methods sorted alphabetically. The asterisk in the end is - optional; it is added to the end of `methods_priority` if missing. - - """ - unordered_groups: List[Set[MethodCls]] = get_prioritized_methods_groups( - methods, methods_priority=methods_priority - ) - - ordered_groups: List[List[MethodCls]] = [ - sort_methods_by_priority(group) for group in unordered_groups - ] - - return [method for group in ordered_groups for method in group] - - -def activate_method(method: Method) -> Tuple[MethodActivationResult, Heartbeat | None]: - """Activates a mode defined by a single Method. - - Returns - ------- - result: - The result of the activation process. - heartbeat: - If the `method` has method.heartbeat() implemented, and activation - succeeds, this is a Heartbeat object. Otherwise, this is None. - """ - if method.is_unnamed(): - raise ValueError("Methods without a name may not be used to activate modes!") - - result = MethodActivationResult(success=False, method_name=method.name) - - if not get_platform_supported(method, platform=CURRENT_PLATFORM): - result.failure_stage = StageName.PLATFORM_SUPPORT - return result, None - - requirements_fail, err_message = caniuse_fails(method) - if requirements_fail: - result.failure_stage = StageName.REQUIREMENTS - result.failure_reason = err_message - return result, None - - success, err_message, heartbeat_call_time = try_enter_and_heartbeat(method) - if not success: - result.failure_stage = StageName.ACTIVATION - result.failure_reason = err_message - return result, None - - result.success = True - - if not heartbeat_call_time: - # Success, using just enter_mode(); no heartbeat() - return result, None - - heartbeat = Heartbeat(method, heartbeat_call_time) - heartbeat.start() - - return result, heartbeat - - -def deactivate_method(method: Method, heartbeat: Optional[Heartbeat] = None) -> None: - """Deactivates a mode defined by the `method`. - - Raises - ------ - MethodError (RuntimeError), if the deactivation was not successful. - """ - - heartbeat_stopped = heartbeat.stop() if heartbeat is not None else True - - if has_exit(method): - errortxt = ( - f"The exit_mode of '{method.__class__.__name__}' ({method.name}) was " - "unsuccessful! This should never happen, and could mean that the " - "implementation has a bug. Entering the mode has been successful, and " - "since exiting was not, your system might still be in the mode defined " - f"by the '{method.__class__.__name__}', or not. Suggesting submitting " - f"a bug report and rebooting for clearing the mode. " - ) - try: - # The Method.exit_mode() *should* always return None. However, it - # is not possible to control user created Methods implementation, - # so this is a safety net for users not having strict static type - # checking. - retval = method.exit_mode() # type: ignore[func-returns-value] - if retval is not None: - raise ValueError("exit_mode returned a value other than None!") - except Exception as e: - raise MethodError(errortxt + "Original error: " + str(e)) - - if heartbeat_stopped is not True: - raise MethodError( - f"The heartbeat of {method.__class__.__name__} ({method.name}) could not " - "be stopped! Suggesting submitting a bug report and rebooting for " - "clearing the mode. " - ) - - -def get_platform_supported(method: Method, platform: PlatformName) -> bool: - """Checks if method is supported by the platform - - Parameters - ---------- - method: Method - The method which platform support to check. - platform: - The platform to check against. - - Returns - ------- - is_supported: bool - If True, the platform is supported. Otherwise, False. - """ - return platform in method.supported_platforms - - -def caniuse_fails(method: Method) -> tuple[bool, str]: - """Check if the requirements of a Method are met or not. - - Returns - ------- - (fail, message): (bool, str) - If Method.caniuse() return True or None, the requirements check does - not fail. In this case, return(False, '') - - If Method.caniuse() return False, or a string, the requirements check - fails, and this function returns (True, message), where message is - either the string returned by .caniuse() or emptry string. - """ - - try: - canuse = method.caniuse() - except Exception as exc: - return True, str(exc) - - fail = False if (canuse is True or canuse is None) else True - message = "" if canuse in {True, False, None} else str(canuse) - - return fail, message - - -def try_enter_and_heartbeat(method: Method) -> Tuple[bool, str, Optional[dt.datetime]]: - """Try to use a Method to to activate a mode. First, with - method.enter_mode(), and then with the method.heartbeat() - - Returns - ------- - success, err_message, heartbeat_call_time - A three-tuple, where success is boolean and True if activating the mode - with the method was successful, otherwise False. The err_message is a - string which may be non-empty only when success is False. The - heartbeat_call_time is the datetime (in UTC) just before calling the - method.hearbeat(). - - Raises - ------ - RunTimeError: In the rare edge case where the `method` has both, - enter_mode() and heartbeat() defined, and the enter_mode() succeeds but the - heartbeat() fails, which causes exit_mode() to be called, and if this - exit_mode() also fails (as this leaves system uncertain state). All other - Exceptions are handled (returning success=False). - - Detailed explanation - -------------------- - These are the three possible statuses from attempts to use either - enter_mode() or heartbeat(): - - M: Missing implementation - F: Failed attempt - S: Succesful attempt - - There are total of 7 different outcomes (3*3 possibilities, minus two from - not checking heartbeat if enter_mode fails), marked as - {enter_mode_outcome}{heartbeat_outcome}; MS means enter_mode() was missing - implementation and using heartbeat() was successful. - - All the possible combinations which may occur are - - outcome What to do - ------- --------------------------------------------------------- - 1) F* Return Fail + enter_mode error message - - 2) MM Raise Exception -- the Method is faulty. - 3) MF Return Fail + heartbeat error message - 4) MS Return Success + heartbeat time - - 5) SM Return Success - 6) SF Return Fail + heartbeat error message + call exit_mode() - 7) SS Return Success + heartbeat time - - """ - enter_outcome, enter_errmessage = _try_enter_mode(method) - - if enter_outcome == MethodOutcome.FAILURE: # 1) F* - return False, enter_errmessage, None - - hb_outcome, hb_errmessage, hb_calltime = _try_heartbeat(method) - - method_name = f"Method {method.__class__.__name__} ({method.name})" - if enter_outcome == MethodOutcome.NOT_IMPLEMENTED: - if hb_outcome == MethodOutcome.NOT_IMPLEMENTED: - errmsg = ( - f"{method_name} is not properly defined! Missing implementation for " - "both, enter_mode() and heartbeat()!" - ) - return False, errmsg, None # 2) MM - elif hb_outcome == MethodOutcome.FAILURE: - return False, hb_errmessage, None # 3) MF - elif hb_outcome == MethodOutcome.SUCCESS: - return True, "", hb_calltime # 4) MS - - elif enter_outcome == MethodOutcome.SUCCESS: - if hb_outcome == MethodOutcome.NOT_IMPLEMENTED: # 5) SM - return True, "", None - elif hb_outcome == MethodOutcome.FAILURE: # 6) SF - _rollback_with_exit(method) - return False, hb_errmessage, None - elif hb_outcome == MethodOutcome.SUCCESS: # 7) SS - return True, "", hb_calltime - - raise RuntimeError( # pragma: no cover - "Should never end up here. Check the return values for the enter_mode() and " - f"heartbeat() of the {method_name}" - ) - - -def _try_enter_mode(method: Method) -> Tuple[MethodOutcome, str]: - """Calls the method.enter_mode(). This function catches any possible - Exceptions during the call.""" - - if not has_enter(method): - return MethodOutcome.NOT_IMPLEMENTED, "" - - outcome, err_message = _try_method_call(method, "enter_mode") - - return outcome, err_message - - -def _try_heartbeat(method: Method) -> Tuple[MethodOutcome, str, Optional[dt.datetime]]: - """Calls the method.heartbeat(). This function catches any possible - Exceptions during the call. - - Returns - ------- - heartbeat_call_time: dt.datetime - The UTC time just before the method.heartbeat() was called. - """ - if not has_heartbeat(method): - return MethodOutcome.NOT_IMPLEMENTED, "", None - - heartbeat_call_time = dt.datetime.now(dt.timezone.utc) - outcome, err_message = _try_method_call(method, "heartbeat") - - return outcome, err_message, heartbeat_call_time - - -def _try_method_call(method: Method, mthdname: str) -> Tuple[MethodOutcome, str]: - try: - method_to_call = getattr(method, mthdname) - retval = method_to_call() - if retval is not None: - raise ValueError( - f"The {mthdname} of {method.__class__.__name__} ({method.name}) " - f"returned an unsupported value {retval}. The only accepted return " - "value is None" - ) - outcome = MethodOutcome.SUCCESS - err_message = "" - except Exception as exc: - err_message = repr(exc) - outcome = MethodOutcome.FAILURE - return outcome, err_message - - -def _rollback_with_exit(method: Method) -> None: - """Roll back entering a mode by exiting it. - - Raises - ------ - RuntimeError, if exit_mode fails (returns something else than None) - - Notes - ----- - This function has the side effect of executing the calls in the - method.exit_mode. - """ - if not has_exit(method): - # Nothing to exit from. - return - - try: - # The Method.exit_mode() *should* always return None. However, it - # is not possible to control user created Methods implementation, - # so this is a safety net for users not having strict static type - # checking. - exit_outcome = method.exit_mode() # type: ignore[func-returns-value] - if exit_outcome is not None: - raise ValueError("exit_method did not return None") - except Exception as exc: - raise RuntimeError( - f"Entered {method.__class__.__name__} ({method.name}) but could not exit!" - + f" Original error: {str(exc)}" - ) from exc - - -class WakepyFakeSuccess(Method): - """This is a special fake method to be used with any mode. It can be used - in tests for faking wakepy mode activation. This way all IO and real - executable, library and dbus calls are prevented. To use this method (and - skip using any other methods), set WAKEPY_FAKE_SUCCESS environment variable - to a truthy value (e.g. "1", or "True"). - """ - - name = "WAKEPY_FAKE_SUCCESS" - mode = "_fake" - - environment_variable = name - - # All other values are considered to be truthy. Comparison is case - # insensitive - falsy_values = ("0", "no", "false") - - supported_platforms = (CURRENT_PLATFORM,) - - def enter_mode(self) -> None: - """Function which says if fake success should be enabled - - Fake success is controlled via WAKEPY_FAKE_SUCCESS environment - variable. If that variable is set to a truthy value,fake success is - activated. - - Falsy values: '0', 'no', 'false' (case ignored) - Truthy values: everything else - - Motivation: - ----------- - When running on CI system, wakepy might fail to acquire an inhibitor - lock just because there is no Desktop Environment running. In these - cases, it might be useful to just tell with an environment variable - that wakepy should fake the successful inhibition anyway. Faking the - success is done after every other method is tried (and failed). - """ - # The os.environ seems to be populated when os is imported -> delay the - # import until here. - import os - - if self.environment_variable not in os.environ: - raise RuntimeError(f"{self.environment_variable} not set.") - - val = os.environ[self.environment_variable] - if val.lower() in self.falsy_values: - raise RuntimeError( - f"{self.environment_variable} set to falsy value: {val}." - ) - - -def has_enter(method: Method) -> bool: - return type(method).enter_mode is not Method.enter_mode - - -def has_exit(method: Method) -> bool: - return type(method).exit_mode is not Method.exit_mode - - -def has_heartbeat(method: Method) -> bool: - return type(method).heartbeat is not Method.heartbeat diff --git a/src/wakepy/core/activationresult.py b/src/wakepy/core/activationresult.py new file mode 100644 index 00000000..ab6ce3ba --- /dev/null +++ b/src/wakepy/core/activationresult.py @@ -0,0 +1,244 @@ +"""This module defines the activation result classes. + + +Most important classes +---------------------- +ActivationResult + This is something returned from mode activation task. Contains the summary + of all used methods, and whether the activation was successful or not. +MethodActivationResult + One level lower than ActivationResult. This is result from activation task + using a single Method. +""" + +from __future__ import annotations + +import typing +from dataclasses import InitVar, dataclass, field +from typing import List, Sequence + +from .constants import WAKEPY_FAKE_SUCCESS, StageName, StageNameValue + +if typing.TYPE_CHECKING: + from typing import Optional + + +@dataclass +class ActivationResult: + """The ActivationResult is responsible of keeping track on the possibly + successful (max 1), failed and unused methods and providing different views + on the results of the activation process. + + Parameters + --------- + results: + The MethodActivationResults to be used to fill the ActivationResult + modename: + Name of the Mode. Optional. + + Attributes + ---------- + modename: str | None + The name of the Mode. If the Mode did not have a name, the modename + is None. + success: bool + Tells is entering into a mode was successful. Note that this may be + faked with WAKEPY_FAKE_SUCCESS environment variable e.g. for testing + purposes. + real_success: bool + Tells is entering into a mode was successful. This + may not faked with WAKEPY_FAKE_SUCCESS environment variable. + failure: bool + Always opposite of `success`. Included for convenience. + active_method: str | None + The name of the the active (successful) method, if any. + + Methods + ------- + list_methods: + Get a list of the methods present in the activation process, and their + activation results. This is the higher-level interface. If you want + more control, use .query(). + query: + Lower level interface for getting the list of the methods present in + the activation process, and their activation results. If you want + easier access, use .list_methods(). + """ + + results: InitVar[Optional[List[MethodActivationResult]]] = None + # These are the retuls for each of the used wakepy.Methods, in the + # order the methods were tried (first = highest priority, last = + # lowest priority) + + modename: Optional[str] = None + """Name of the mode, if any.""" + + active_method: str | None = field(init=False) + """The name of the active (successful) method. If no methods are active, + this is None.""" + + success: bool = field(init=False) + """Tells is entering into a mode was successful. + + Note that this may be faked with WAKEPY_FAKE_SUCCESS environment + variable (for tests). See also: real_success. + """ + + real_success: bool = field(init=False) + """Tells is entering into a mode was successful. This + may not faked with WAKEPY_FAKE_SUCCESS environment variable. + """ + + failure: bool = field(init=False) + """Always opposite of `success`. Included for convenience.""" + + _method_results: List[MethodActivationResult] = field(init=False) + + def __post_init__( + self, + results: Optional[List[MethodActivationResult]] = None, + ) -> None: + self._method_results = results or [] + self.success = self._get_success() + self.failure = not self.success + self.real_success = self._get_real_success() + self.active_method = self._get_active_method() + + def list_methods( + self, + ignore_platform_fails: bool = True, + ignore_unused: bool = False, + ) -> list[MethodActivationResult]: + """Get a list of the methods present in the activation process, and + their activation results. This is the higher-level interface. If you + want more control, use .query(). The returned methods are in the order + as given in when initializing ActivationResult. If you did not create + the ActivationReult manually, the methods are in the priority order; + the highest priority methods (those which are/were tried first) are + listed first. + + Parameters + ---------- + ignore_platform_fails: + If True, ignores plaform support check fail. This is the default as + usually one is not interested in methods which are meant for other + platforms. If False, includes also platform fails. Default: True. + ignore_unused: + If True, ignores all unused / remaining methods. Default: False. + """ + + success_values = (True, False) if ignore_unused else (True, False, None) + + fail_stages = [StageName.REQUIREMENTS, StageName.ACTIVATION] + if not ignore_platform_fails: + fail_stages.insert(0, StageName.PLATFORM_SUPPORT) + + return self.query(success=success_values, fail_stages=fail_stages) + + def query( + self, + success: Sequence[bool | None] = (True, False, None), + fail_stages: Sequence[StageName | StageNameValue] = ( + StageName.PLATFORM_SUPPORT, + StageName.REQUIREMENTS, + StageName.ACTIVATION, + ), + ) -> list[MethodActivationResult]: + """Get a list of the methods present in the activation process, and + their activation results. This is the lower-level interface. If you + want easier access, use .list_methods(). The methods are in the order + as given in when initializing ActivationResult. If you did not create + the ActivationReult manually, the methods are in the priority order; + the highest priority methods (those which are/were tried first) are + listed first. + + Parameters + ---------- + success: + Controls what methods to include in the output. Options are: + True (success), False (failure) and None (method not used). If + `success = (True, False)`, returns only methods which did succeed + or failer (do not return unused methods). + fail_stages: + The fail stages to include in the output. The options are + "PLATFORM_SUPPORT", "REQUIREMENTS" and "ACTIVATION". + """ + out = [] + for res in self._method_results: + if res.success not in success: + continue + elif res.success is False and res.failure_stage not in fail_stages: + continue + elif res.success is False and res.method_name == WAKEPY_FAKE_SUCCESS: + # The fake method is only listed if it was requested to be, + # used, and when it is not requested to be used, the + # res.success is False. + continue + out.append(res) + + return out + + def get_error_text(self) -> str: + """Gets information about a failure as text. In case the mode + activation was successful, returns an empty string.""" + + if self.success: + return "" + debug_info = str(self.query()) + modename = self.modename or "[unnamed mode]" + + return ( + f'Could not activate Mode "{modename}"!\n\nMethod usage results, in ' + f"order (highest priority first):\n{debug_info}" + ) + + def _get_success(self) -> bool: + for res in self._method_results: + if res.success: + return True + return False + + def _get_real_success(self) -> bool: + + for res in self._method_results: + if res.success and res.method_name != WAKEPY_FAKE_SUCCESS: + return True + return False + + def _get_active_method(self) -> str | None: + methods = [res.method_name for res in self._method_results if res.success] + if not methods: + return None + elif len(methods) == 1: + return methods[0] + else: + raise ValueError( + "The ActivationResult cannot have more than one active methods! " + f"Active methods: {methods}" + ) + + +@dataclass +class MethodActivationResult: + """This class is a result from using a single Method to activate a mode.""" + + method_name: str + + # True: Using Method was successful + # False: Using Method failed + # None: Method is unused + success: bool | None + + # None if the method did not fail. Otherwise, the name of the stage where + # the method failed. + failure_stage: Optional[StageName] = None + + failure_reason: str = "" + + def __repr__(self) -> str: + error_at = " @" + self.failure_stage if self.failure_stage else "" + failure_reason = f', "{self.failure_reason}"' if self.success is False else "" + success_str = ( + "SUCCESS" if self.success else "FAIL" if self.success is False else "UNUSED" + ) + return f"({success_str}{error_at}, {self.method_name}{failure_reason})" diff --git a/src/wakepy/core/constants.py b/src/wakepy/core/constants.py index 60682e1d..a9ffe72e 100644 --- a/src/wakepy/core/constants.py +++ b/src/wakepy/core/constants.py @@ -1,6 +1,7 @@ """Common terms and definitions used in many places""" import sys +from typing import List, Set, Tuple, TypeVar, Union from .strenum import StrEnum, auto @@ -9,6 +10,10 @@ else: # pragma: no-cover-if-py-lt-38 from typing import Literal +WAKEPY_FAKE_SUCCESS = "WAKEPY_FAKE_SUCCESS" +"""Name of the Wakepy fake success method and the environment variable used +to set it""" + class PlatformName(StrEnum): WINDOWS = auto() @@ -41,3 +46,23 @@ class BusType(StrEnum): BusTypeValue = Literal["SESSION", "SYSTEM"] + + +class StageName(StrEnum): + # These are stages which occur in order for each of the methods + # when using a Method for activation. + + NONE = auto() # No stage at all. + + # The stages in the activation process in order + PLATFORM_SUPPORT = auto() + REQUIREMENTS = auto() + ACTIVATION = auto() + + +StageNameValue = Literal["NONE", "PLATFORM_SUPPORT", "REQUIREMENTS", "ACTIVATION"] + +# Type annotations +T = TypeVar("T") +Collection = Union[List[T], Tuple[T, ...], Set[T]] +StrCollection = Collection[str] diff --git a/src/wakepy/core/heartbeat.py b/src/wakepy/core/heartbeat.py index 2101f294..88593c31 100644 --- a/src/wakepy/core/heartbeat.py +++ b/src/wakepy/core/heartbeat.py @@ -11,6 +11,7 @@ class Heartbeat: # TODO: This is just temporary dummy implementation. + # Will be created as part of https://github.com/fohrloop/wakepy/issues/109 def __init__( self, method: Method, heartbeat_call_time: Optional[dt.datetime] = None ): diff --git a/src/wakepy/core/method.py b/src/wakepy/core/method.py index 6d9a3d35..10234460 100644 --- a/src/wakepy/core/method.py +++ b/src/wakepy/core/method.py @@ -1,24 +1,22 @@ -"""This module defines the Method class and few functions for working with -methods +"""This module defines the Method class which is meant to be subclassed. Method * A class which is intended to be subclassed * The Methods are ways of entering wakepy Modes. - -General functions ------------------ -select_methods - Select Methods from a collection based on a white- or blacklist. - """ from __future__ import annotations +import datetime as dt import sys import typing from abc import ABC -from typing import cast +from typing import Type, cast +from .activationresult import MethodActivationResult +from .constants import PlatformName, StageName +from .heartbeat import Heartbeat +from .platform import CURRENT_PLATFORM from .registry import register_method from .strenum import StrEnum, auto @@ -29,39 +27,19 @@ if typing.TYPE_CHECKING: - from typing import Any, List, Optional, Set, Tuple, Type, TypeVar, Union + from typing import Any, Optional, Tuple from wakepy.core import DBusAdapter, DBusMethodCall from .constants import ModeName, PlatformName - MethodCls = Type["Method"] - T = TypeVar("T") - Collection = Union[List[T], Tuple[T, ...], Set[T]] - MethodClsCollection = Collection[MethodCls] - StrCollection = Collection[str] +MethodCls = Type["Method"] class MethodError(RuntimeError): """Occurred inside wakepy.core.method.Method""" -class EnterModeError(MethodError): - """Occurred during method.enter_mode()""" - - -class ExitModeError(MethodError): - """Occurred during method.exit_mode()""" - - -class HeartbeatCallError(MethodError): - """Occurred during method.heartbeat()""" - - -class MethodDefinitionError(RuntimeError): - """Any error which is part of the Method (subclass) definition.""" - - class MethodOutcome(StrEnum): NOT_IMPLEMENTED = auto() SUCCESS = auto() @@ -91,8 +69,8 @@ class Method(ABC): """ mode: ModeName | str - """The mode for the method. Each method may be connected to single mode. - Use None for methods which do not implement any mode.""" + """The mode for the method. Each Method subclass may be registered to a + single mode.""" supported_platforms: Tuple[PlatformName, ...] = tuple() """All the supported platforms. If a platform is not listed here, this @@ -274,54 +252,302 @@ def is_unnamed(cls) -> bool: return cls.name == unnamed -def select_methods( - methods: MethodClsCollection, - omit: Optional[StrCollection] = None, - use_only: Optional[StrCollection] = None, -) -> List[MethodCls]: - """Selects Methods from from `methods` using a blacklist (omit) or - whitelist (use_only). If `omit` and `use_only` are both None, will return - all the original methods. +def activate_method(method: Method) -> Tuple[MethodActivationResult, Heartbeat | None]: + """Activates a mode defined by a single Method. + + Returns + ------- + result: + The result of the activation process. + heartbeat: + If the `method` has method.heartbeat() implemented, and activation + succeeds, this is a Heartbeat object. Otherwise, this is None. + """ + if method.is_unnamed(): + raise ValueError("Methods without a name may not be used to activate modes!") + + result = MethodActivationResult(success=False, method_name=method.name) + + if not get_platform_supported(method, platform=CURRENT_PLATFORM): + result.failure_stage = StageName.PLATFORM_SUPPORT + return result, None + + requirements_fail, err_message = caniuse_fails(method) + if requirements_fail: + result.failure_stage = StageName.REQUIREMENTS + result.failure_reason = err_message + return result, None + + success, err_message, heartbeat_call_time = try_enter_and_heartbeat(method) + if not success: + result.failure_stage = StageName.ACTIVATION + result.failure_reason = err_message + return result, None + + result.success = True + + if not heartbeat_call_time: + # Success, using just enter_mode(); no heartbeat() + return result, None + + heartbeat = Heartbeat(method, heartbeat_call_time) + heartbeat.start() + + return result, heartbeat + + +def deactivate_method(method: Method, heartbeat: Optional[Heartbeat] = None) -> None: + """Deactivates a mode defined by method + + Raises + ------ + MethodError (RuntimeError), if the deactivation was not successful. + """ + + heartbeat_stopped = heartbeat.stop() if heartbeat is not None else True + + if has_exit(method): + errortxt = ( + f"The exit_mode of '{method.__class__.__name__}' ({method.name}) was " + "unsuccessful! This should never happen, and could mean that the " + "implementation has a bug. Entering the mode has been successful, and " + "since exiting was not, your system might still be in the mode defined " + f"by the '{method.__class__.__name__}', or not. Suggesting submitting " + f"a bug report and rebooting for clearing the mode. " + ) + try: + # The Method.exit_mode() *should* always return None. However, it + # is not possible to control user created Methods implementation, + # so this is a safety net for users not having strict static type + # checking. + retval = method.exit_mode() # type: ignore[func-returns-value] + if retval is not None: + raise ValueError("exit_mode returned a value other than None!") + except Exception as e: + raise MethodError(errortxt + "Original error: " + str(e)) + + if heartbeat_stopped is not True: + raise MethodError( + f"The heartbeat of {method.__class__.__name__} ({method.name}) could not " + "be stopped! Suggesting submitting a bug report and rebooting for " + "clearing the mode. " + ) + + +def get_platform_supported(method: Method, platform: PlatformName) -> bool: + """Checks if method is supported by the platform Parameters ---------- - methods: collection of Method classes - The collection of methods from which to make the selection. - omit: list, tuple or set of str or None - The names of Methods to remove from the `methods`; a "blacklist" - filter. Any Method in `omit` but not in `methods` will be silently - ignored. Cannot be used same time with `use_only`. Optional. - use_only: list, tuple or set of str - The names of Methods to select from the `methods`; a "whitelist" - filter. Means "use these and only these Methods". Any Methods in - `use_only` but not in `methods` will raise a ValueErrosr. Cannot - be used same time with `omit`. Optional. + method: Method + The method which platform support to check. + platform: + The platform to check against. + + Returns + ------- + is_supported: bool + If True, the platform is supported. Otherwise, False. + """ + return platform in method.supported_platforms + + +def caniuse_fails(method: Method) -> tuple[bool, str]: + """Check if the requirements of a Method are met or not. + + Returns + ------- + (fail, message): (bool, str) + If Method.caniuse() return True or None, the requirements check does + not fail. In this case, return(False, '') + + If Method.caniuse() return False, or a string, the requirements check + fails, and this function returns (True, message), where message is + either the string returned by .caniuse() or emptry string. + """ + + try: + canuse = method.caniuse() + except Exception as exc: + return True, str(exc) + + fail = False if (canuse is True or canuse is None) else True + message = "" if canuse in {True, False, None} else str(canuse) + + return fail, message + + +def try_enter_and_heartbeat(method: Method) -> Tuple[bool, str, Optional[dt.datetime]]: + """Try to use a Method to to activate a mode. First, with + method.enter_mode(), and then with the method.heartbeat() Returns ------- - methods: list[MethodCls] - The selected method classes. + success, err_message, heartbeat_call_time + A three-tuple, where success is boolean and True if activating the mode + with the method was successful, otherwise False. The err_message is a + string which may be non-empty only when success is False. The + heartbeat_call_time is the datetime (in UTC) just before calling the + method.hearbeat(). Raises ------ - ValueError if the input arguments (omit or use_only) are invalid. + RunTimeError: In the rare edge case where the `method` has both, + enter_mode() and heartbeat() defined, and the enter_mode() succeeds but the + heartbeat() fails, which causes exit_mode() to be called, and if this + exit_mode() also fails (as this leaves system uncertain state). All other + Exceptions are handled (returning success=False). + + Detailed explanation + -------------------- + These are the three possible statuses from attempts to use either + enter_mode() or heartbeat(): + + M: Missing implementation + F: Failed attempt + S: Succesful attempt + + There are total of 7 different outcomes (3*3 possibilities, minus two from + not checking heartbeat if enter_mode fails), marked as + {enter_mode_outcome}{heartbeat_outcome}; MS means enter_mode() was missing + implementation and using heartbeat() was successful. + + All the possible combinations which may occur are + + outcome What to do + ------- --------------------------------------------------------- + 1) F* Return Fail + enter_mode error message + + 2) MM Raise Exception -- the Method is faulty. + 3) MF Return Fail + heartbeat error message + 4) MS Return Success + heartbeat time + + 5) SM Return Success + 6) SF Return Fail + heartbeat error message + call exit_mode() + 7) SS Return Success + heartbeat time + """ + enter_outcome, enter_errmessage = _try_enter_mode(method) - if omit and use_only: - raise ValueError( - "Can only define omit (blacklist) or use_only (whitelist), not both!" - ) - elif omit is None and use_only is None: - selected_methods = list(methods) - elif omit is not None: - selected_methods = [m for m in methods if m.name not in omit] - elif use_only is not None: - selected_methods = [m for m in methods if m.name in use_only] - if not set(use_only).issubset(m.name for m in selected_methods): - missing = sorted(set(use_only) - set(m.name for m in selected_methods)) + if enter_outcome == MethodOutcome.FAILURE: # 1) F* + return False, enter_errmessage, None + + hb_outcome, hb_errmessage, hb_calltime = _try_heartbeat(method) + + method_name = f"Method {method.__class__.__name__} ({method.name})" + if enter_outcome == MethodOutcome.NOT_IMPLEMENTED: + if hb_outcome == MethodOutcome.NOT_IMPLEMENTED: + errmsg = ( + f"{method_name} is not properly defined! Missing implementation for " + "both, enter_mode() and heartbeat()!" + ) + return False, errmsg, None # 2) MM + elif hb_outcome == MethodOutcome.FAILURE: + return False, hb_errmessage, None # 3) MF + elif hb_outcome == MethodOutcome.SUCCESS: + return True, "", hb_calltime # 4) MS + + elif enter_outcome == MethodOutcome.SUCCESS: + if hb_outcome == MethodOutcome.NOT_IMPLEMENTED: # 5) SM + return True, "", None + elif hb_outcome == MethodOutcome.FAILURE: # 6) SF + _rollback_with_exit(method) + return False, hb_errmessage, None + elif hb_outcome == MethodOutcome.SUCCESS: # 7) SS + return True, "", hb_calltime + + raise RuntimeError( # pragma: no cover + "Should never end up here. Check the return values for the enter_mode() and " + f"heartbeat() of the {method_name}" + ) + + +def _try_enter_mode(method: Method) -> Tuple[MethodOutcome, str]: + """Calls the method.enter_mode(). This function catches any possible + Exceptions during the call.""" + + if not has_enter(method): + return MethodOutcome.NOT_IMPLEMENTED, "" + + outcome, err_message = _try_method_call(method, "enter_mode") + + return outcome, err_message + + +def _try_heartbeat(method: Method) -> Tuple[MethodOutcome, str, Optional[dt.datetime]]: + """Calls the method.heartbeat(). This function catches any possible + Exceptions during the call. + + Returns + ------- + heartbeat_call_time: dt.datetime + The UTC time just before the method.heartbeat() was called. + """ + if not has_heartbeat(method): + return MethodOutcome.NOT_IMPLEMENTED, "", None + + heartbeat_call_time = dt.datetime.now(dt.timezone.utc) + outcome, err_message = _try_method_call(method, "heartbeat") + + return outcome, err_message, heartbeat_call_time + + +def _try_method_call(method: Method, mthdname: str) -> Tuple[MethodOutcome, str]: + try: + method_to_call = getattr(method, mthdname) + retval = method_to_call() + if retval is not None: raise ValueError( - f"Methods {missing} in `use_only` are not part of `methods`!" + f"The {mthdname} of {method.__class__.__name__} ({method.name}) " + f"returned an unsupported value {retval}. The only accepted return " + "value is None" ) - else: # pragma: no cover - raise ValueError("Invalid `omit` and/or `use_only`!") - return selected_methods + outcome = MethodOutcome.SUCCESS + err_message = "" + except Exception as exc: + err_message = repr(exc) + outcome = MethodOutcome.FAILURE + return outcome, err_message + + +def _rollback_with_exit(method: Method) -> None: + """Roll back entering a mode by exiting it. + + Raises + ------ + RuntimeError, if exit_mode fails (returns something else than None) + + Notes + ----- + This function has the side effect of executing the calls in the + method.exit_mode. + """ + if not has_exit(method): + # Nothing to exit from. + return + + try: + # The Method.exit_mode() *should* always return None. However, it + # is not possible to control user created Methods implementation, + # so this is a safety net for users not having strict static type + # checking. + exit_outcome = method.exit_mode() # type: ignore[func-returns-value] + if exit_outcome is not None: + raise ValueError("exit_method did not return None") + except Exception as exc: + raise RuntimeError( + f"Entered {method.__class__.__name__} ({method.name}) but could not exit!" + + f" Original error: {str(exc)}" + ) from exc + + +def has_enter(method: Method) -> bool: + return type(method).enter_mode is not Method.enter_mode + + +def has_exit(method: Method) -> bool: + return type(method).exit_mode is not Method.exit_mode + + +def has_heartbeat(method: Method) -> bool: + return type(method).heartbeat is not Method.heartbeat diff --git a/src/wakepy/core/mode.py b/src/wakepy/core/mode.py index 075db488..2bf0f1da 100644 --- a/src/wakepy/core/mode.py +++ b/src/wakepy/core/mode.py @@ -1,25 +1,47 @@ +"""This module defines the Mode class and functions which may be used in the +activation and deactivation of Modes (using Methods). + +Most important functions +------------------------ +activate_method(method:Method) -> MethodActivationResult + Activate a mode using a single Method +get_prioritized_methods + Prioritize of collection of Methods +""" + from __future__ import annotations +import logging import typing import warnings +from typing import List, Sequence, Set, Union + +from wakepy.core.constants import WAKEPY_FAKE_SUCCESS -from .activation import ActivationResult, activate_mode, deactivate_method -from .dbus import get_dbus_adapter +from .activationresult import ActivationResult +from .dbus import DBusAdapter, get_dbus_adapter from .heartbeat import Heartbeat -from .method import select_methods -from .registry import get_methods_for_mode +from .method import Method, activate_method, deactivate_method +from .platform import CURRENT_PLATFORM +from .registry import get_method, get_methods_for_mode if typing.TYPE_CHECKING: from types import TracebackType - from typing import Callable, Literal, Optional, Type + from typing import Callable, List, Literal, Optional, Set, Tuple, Type, Union - from .activation import MethodsPriorityOrder - from .constants import ModeName + from .constants import Collection, ModeName, StrCollection from .dbus import DBusAdapter, DBusAdapterTypeSeq - from .method import Method, StrCollection + from .method import Method, MethodCls OnFail = Literal["error", "warn", "pass"] | Callable[[ActivationResult], None] + MethodClsCollection = Collection[MethodCls] + + +"""The strings in MethodsPriorityOrder are names of wakepy.Methods or the +asterisk ('*').""" +MethodsPriorityOrder = Sequence[Union[str, Set[str]]] + class ActivationError(RuntimeError): """Raised if activation is not successful and on-fail action is to raise @@ -47,56 +69,6 @@ class ModeExit(Exception): """ -class ModeController: - def __init__(self, dbus_adapter: Optional[DBusAdapter] = None): - self.dbus_adapter = dbus_adapter - self.active_method: Method | None = None - self.heartbeat: Heartbeat | None = None - - def activate( - self, - method_classes: list[Type[Method]], - methods_priority: Optional[MethodsPriorityOrder] = None, - modename: Optional[str] = None, - ) -> ActivationResult: - """Activates the mode with one of the methods in the input method - classes. The methods are used with descending priority; highest - priority first. - """ - result, active_method, heartbeat = activate_mode( - methods=method_classes, - methods_priority=methods_priority, - dbus_adapter=self.dbus_adapter, - modename=modename, - ) - self.active_method = active_method - self.heartbeat = heartbeat - return result - - def deactivate(self) -> bool: - """Deactivates the active mode, defined by the active Method, if any. - If there was no active method, does nothing. - - Returns - ------- - deactivated: - If there was no active method, returns False (nothing was done). - If there was an active method, and it was deactivated, returns True - - Raises - ------ - MethodError (RuntimeError) if there was active method but an error - occurred when trying to deactivate it.""" - - if not self.active_method: - return False - - deactivate_method(self.active_method, self.heartbeat) - self.active_method = None - self.heartbeat = None - return True - - class Mode: """A mode is something that is activated (entered in) and deactivated (exited from). Each Mode instance is created with a set of Method classes, @@ -177,12 +149,6 @@ class Mode: """The ``on_fail`` given when creating the :class:`Mode`. """ - dbus_adapter: DBusAdapter | None - r"""The DBus adapter used with ``Method``\ s which require DBus (if any). - """ - - _controller_class: Type[ModeController] = ModeController - def __init__( self, methods: list[Type[Method]], @@ -196,76 +162,19 @@ def __init__( This is also where the activation process related settings, such as the dbus adapter to be used, are defined. """ - - self.name = name self.method_classes = methods - self.methods_priority = methods_priority - self.controller: ModeController | None = None - self.activation_result = ActivationResult() self.active: bool = False + self.activation_result = ActivationResult() + self.name = name + self.methods_priority = methods_priority self.on_fail = on_fail - self._dbus_adapter_cls = dbus_adapter - - def __enter__(self) -> Mode: - """Called automatically when entering a with block and a instance of - Mode is used as the context expression. This tries to activate the - Mode using :attr:`~wakepy.Mode.method_classes`. - """ - - self.controller = self.controller or self._controller_class( - dbus_adapter=get_dbus_adapter(self._dbus_adapter_cls) - ) - self.activation_result = self.controller.activate( - self.method_classes, - methods_priority=self.methods_priority, - modename=self.name, - ) - self.active = self.activation_result.success - - if not self.active: - handle_activation_fail(self.on_fail, self.activation_result) - - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exception: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> bool: - """Called when exiting the with block. - - If with block completed normally, called with `(None, None, None)` - If with block had an exception, called with `(exc_type, exc_value, - traceback)`, which is the same as `*sys.exc_info`. - - Will swallow any ModeExit exception. Other exceptions will be - re-raised. - """ - - # These are not used but are part of context manager protocol. - # make linters happy - _ = exc_type - _ = traceback - - if self.controller is None: - raise RuntimeError("Must __enter__ before __exit__!") - - self.controller.deactivate() - self.active = False - - if exception is None or isinstance(exception, ModeExit): - # Returning True means that the exception within the with block is - # swallowed. We skip only ModeExit which should simply exit the - # with block. - return True + self.active_method: Method | None = None + self.heartbeat: Heartbeat | None = None - # Other types of exceptions are not handled; ignoring them here and - # returning False will tell python to re-raise the exception. Can't - # return None as type-checkers will mark code after with block - # unreachable + self._dbus_adapter_cls = dbus_adapter + self._dbus_adapter: DBusAdapter | None = None - return False + self._logger = logging.getLogger(__name__) @classmethod def from_name( @@ -282,9 +191,11 @@ def from_name( Parameters ---------- - modename: - The name of the mode to create. Used for debugging, logging, - warning and error messages. Could be basically any string. + modename: str + The name of the mode to create. Must be an existing Mode name; + something that has used as Method.name attribute in a + :class:`~wakepy.core.method.Method` subclass. Examples: + "keep.running", "keep.presenting". methods: list, tuple or set of str The names of Methods to select from the mode defined with `modename`; a "whitelist" filter. Means "use these and only these @@ -330,6 +241,444 @@ def from_name( dbus_adapter=dbus_adapter, ) + def __enter__(self) -> Mode: + """Called automatically when entering a with block and a instance of + Mode is used as the context expression. This tries to activate the + Mode using :attr:`~wakepy.Mode.method_classes`. + """ + + self._activate( + self.method_classes, + methods_priority=self.methods_priority, + modename=self.name, + ) + + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exception: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: + """Called when exiting the with block. + + If with block completed normally, called with `(None, None, None)` + If with block had an exception, called with `(exc_type, exc_value, + traceback)`, which is the same as `*sys.exc_info`. + + Will swallow any ModeExit exception. Other exceptions will be + re-raised. + """ + + # These are not used but are part of context manager protocol. + # make linters happy + _ = exc_type + _ = traceback + + self._deactivate() + + if exception is None or isinstance(exception, ModeExit): + # Returning True means that the exception within the with block is + # swallowed. We skip only ModeExit which should simply exit the + # with block. + return True + + # Other types of exceptions are not handled; ignoring them here and + # returning False will tell python to re-raise the exception. Can't + # return None as type-checkers will mark code after with block + # unreachable + + return False + + def _activate( + self, + method_classes: list[Type[Method]], + methods_priority: Optional[MethodsPriorityOrder] = None, + modename: Optional[str] = None, + ) -> ActivationResult: + """Activates the mode with one of the methods in the input method + classes. The methods are used with descending priority; highest + priority first. + """ + if not self._dbus_adapter: + self._dbus_adapter = get_dbus_adapter(self._dbus_adapter_cls) + + self.activation_result, self.active_method, self.heartbeat = activate_mode( + methods=method_classes, + methods_priority=methods_priority, + dbus_adapter=self._dbus_adapter, + modename=modename, + ) + self.active = self.activation_result.success + + if not self.active: + handle_activation_fail(self.on_fail, self.activation_result) + + return self.activation_result + + def _deactivate(self) -> bool: + """Deactivates the active mode, defined by the active Method, if any. + If there was no active method, does nothing. + + Returns + ------- + deactivated: + If there was no active method, returns False (nothing was done). + If there was an active method, and it was deactivated, returns True + + Raises + ------ + MethodError (RuntimeError) if there was active method but an error + occurred when trying to deactivate it.""" + + if self.active: + if self.active_method is None: + raise RuntimeError( + f"Cannot deactivate mode: {str(self.name)}. The active_method is None! This should never happen." # noqa E501 + ) + deactivate_method(self.active_method, self.heartbeat) + deactivated = True + else: + deactivated = False + + self.active_method = None + self.heartbeat = None + self.active = False + + return deactivated + + +def select_methods( + methods: MethodClsCollection, + omit: Optional[StrCollection] = None, + use_only: Optional[StrCollection] = None, +) -> List[MethodCls]: + """Selects Methods from from `methods` using a blacklist (omit) or + whitelist (use_only). If `omit` and `use_only` are both None, will return + all the original methods. + + Parameters + ---------- + methods: collection of Method classes + The collection of methods from which to make the selection. + omit: list, tuple or set of str or None + The names of Methods to remove from the `methods`; a "blacklist" + filter. Any Method in `omit` but not in `methods` will be silently + ignored. Cannot be used same time with `use_only`. Optional. + use_only: list, tuple or set of str + The names of Methods to select from the `methods`; a "whitelist" + filter. Means "use these and only these Methods". Any Methods in + `use_only` but not in `methods` will raise a ValueErrosr. Cannot + be used same time with `omit`. Optional. + + Returns + ------- + methods: list[MethodCls] + The selected method classes. + + Raises + ------ + ValueError if the input arguments (omit or use_only) are invalid. + """ + + if omit and use_only: + raise ValueError( + "Can only define omit (blacklist) or use_only (whitelist), not both!" + ) + elif omit is None and use_only is None: + selected_methods = list(methods) + elif omit is not None: + selected_methods = [m for m in methods if m.name not in omit] + elif use_only is not None: + selected_methods = [m for m in methods if m.name in use_only] + if not set(use_only).issubset(m.name for m in selected_methods): + missing = sorted(set(use_only) - set(m.name for m in selected_methods)) + raise ValueError( + f"Methods {missing} in `use_only` are not part of `methods`!" + ) + else: # pragma: no cover + raise ValueError("Invalid `omit` and/or `use_only`!") + return selected_methods + + +def activate_mode( + methods: list[Type[Method]], + dbus_adapter: Optional[DBusAdapter] = None, + methods_priority: Optional[MethodsPriorityOrder] = None, + modename: Optional[str] = None, +) -> Tuple[ActivationResult, Optional[Method], Optional[Heartbeat]]: + """Activates a mode defined by a collection of Methods. Only the first + Method which succeeds activation will be used, in order from highest + priority to lowest priority. + + The activation may be faked as to be successful by using the + WAKEPY_FAKE_SUCCESS environment variable. + + Parameters + ---------- + methods: + The list of Methods to be used for activating this Mode. + dbus_adapter: + Can be used to define a custom DBus adapter for processing DBus calls + in the .caniuse(), .enter_mode(), .heartbeat() and .exit_mode() of the + Method. Optional. + methods_priority: list[str | set[str]] + The priority order, which is a list of method names or asterisk + ('*'). The asterisk means "all rest methods" and may occur only + once in the priority order, and cannot be part of a set. All method + names must be unique and must be part of the `methods`. + modename: + Name of the Mode. Used for communication to user, logging and in + error messages (can be "any string" which makes sense to you). + Optional. + """ + check_methods_priority(methods_priority, methods) + if not methods: + # Cannot activate anything as there are no methods. + return ActivationResult(modename=modename), None, None + + prioritized_methods = get_prioritized_methods(methods, methods_priority) + # The fake method is always checked first (WAKEPY_FAKE_SUCCESS) + prioritized_methods.insert(0, get_method(WAKEPY_FAKE_SUCCESS)) + + results = [] + + for methodcls in prioritized_methods: + method = methodcls(dbus_adapter=dbus_adapter) + methodresult, heartbeat = activate_method(method) + results.append(methodresult) + if methodresult.success: + break + else: + # Tried activate with all methods, but none of them succeed + return ActivationResult(results, modename=modename), None, None + + # Activation was succesful. + return ActivationResult(results, modename=modename), method, heartbeat + + +def check_methods_priority( + methods_priority: Optional[MethodsPriorityOrder], methods: List[MethodCls] +) -> None: + """Checks against `methods` that the `methods_priority` is valid. + + Parameters + ---------- + methods_priority: list[str | set[str]] + The priority order, which is a list of where items are method names, + sets of methods names or the asterisk ('*'). The asterisk means "all + rest methods" and may occur only once in the priority order, and cannot + be part of a set. All method names must be unique and must be part of + the `methods`. + methods: list[MethodCls] + The methods which the `methods_priority` is validated against. + + Raises + ------ + ValueError or TypeError if the `methods_priority` is not valid. + """ + if methods_priority is None: + return + + known_method_names = {m.name for m in methods} + known_method_names.add("*") + seen_method_names = set() + + for method_name, in_set in _iterate_methods_priority(methods_priority): + if not isinstance(method_name, str): + raise TypeError("methods_priority must be a list[str | set[str]]!") + + if in_set and method_name == "*": + raise ValueError( + "Asterisk (*) may not be a part of a set in methods_priority!" + ) + if method_name not in known_method_names: + raise ValueError( + f'Method "{method_name}" in methods_priority not in selected methods!' + ) + if method_name in seen_method_names: + if method_name != "*": + raise ValueError( + f'Duplicate method name "{method_name}" in methods_priority' + ) + else: + raise ValueError( + "The asterisk (*) can only occur once in methods_priority!" + ) + seen_method_names.add(method_name) + + +def _iterate_methods_priority( + methods_priority: Optional[MethodsPriorityOrder], +) -> typing.Iterator[Tuple[str, bool]]: + """Provides an iterator over the items in methods_priority. The items in + the iterator are (method_name, in_set) 2-tuples, where the method_name is + the method name (str) and the in_set is a boolean which is True if the + returned method_name is part of a set and False otherwise.""" + + if not methods_priority: + return + + for item in methods_priority: + if isinstance(item, set): + for method_name in item: + yield method_name, True + else: + yield item, False + + +def get_prioritized_methods_groups( + methods: List[MethodCls], methods_priority: Optional[MethodsPriorityOrder] +) -> List[Set[MethodCls]]: + """Prioritizes Methods in `methods` based on priority order defined by + `methods_priority`. This function does not validate the methods_priority in + any way; use `check_methods_priority` for validation of needed. + + Parameters + ---------- + methods: list[MethodCls] + The source list of methods. These methods are returned as prioritized + groups. + methods_priority: list[str | set[str]] + The names of the methods in `methods`. This specifies the priority + order; the order of method classes in the returned list. An asterisk + ('*') can be used to denote "all other methods". + + + Returns + ------- + method_groups: list[set[MethodCls]] + The prioritized methods. Each set in the output represents a group of + equal priority. All Methods from the input `methods` are always + included in the output + + + Example + ------- + Say there are methods MethodA, MethodB, MethodC, MethodD, MethodE, MethodF + with names "A", "B", "C", "D", "E", "F": + + >>> methods = [MethodA, MethodB, MethodC, MethodD, MethodE, MethodF] + >>> get_prioritized_methods_groups( + methods, + methods_priority=["A", "F", "*"] + ) + [ + {MethodA}, + {MethodF}, + {MethodB, MethodC, MethodD, MethodE}, + ] + + """ + + # Make this a list of sets just to make things simpler + methods_priority_sets: List[Set[str]] = [ + {item} if isinstance(item, str) else item for item in methods_priority or [] + ] + + method_dct = {m.name: m for m in methods} + + asterisk = {"*"} + asterisk_index = None + out: List[Set[MethodCls]] = [] + + for item in methods_priority_sets: + if item == asterisk: + # Save the location where to add the rest of the methods ('*') + asterisk_index = len(out) + else: # Item is a set but not `asterisk` + out.append({method_dct[name] for name in item}) + + out_flattened = {m for group in out for m in group} + rest_of_the_methods = {m for m in methods if m not in out_flattened} + + if rest_of_the_methods: + if asterisk_index is not None: + out.insert(asterisk_index, rest_of_the_methods) + else: + out.append(rest_of_the_methods) + + return out + + +def sort_methods_by_priority(methods: Set[MethodCls]) -> List[MethodCls]: + """Sorts Method classes by priority and returns a new sorted list with + Methods with highest priority first. + + The logic is: + (1) Any Methods supporting the CURRENT_PLATFORM are placed before any other + Methods (the others are not expected to work at all) + (2) Sort alphabetically by Method name, ignoring the case + """ + return sorted( + methods, + key=lambda m: ( + # Prioritize methods supporting CURRENT_PLATFORM over any others + 0 if CURRENT_PLATFORM in m.supported_platforms else 1, + m.name.lower() if m.name else "", + ), + ) + + +def get_prioritized_methods( + methods: List[MethodCls], + methods_priority: Optional[MethodsPriorityOrder] = None, +) -> List[MethodCls]: + """Take an unordered list of Methods and sort them by priority using the + methods_priority and automatic ordering. The methods_priority is used to + define groups of priority (sets of methods). The automatic ordering part is + used to order the methods *within* each priority group. In particular, all + methods supported by the current platform are placed first, and all + supported methods are then ordered alphabetically (ignoring case). + + Parameters + ---------- + methods: + The list of Methods to sort. + methods_priority: + Optional priority order, which is a list of method names (strings) or + sets of method names (sets of strings). An asterisk ('*') may be used + for "all the rest methods". None is same as ['*']. + + Returns + ------- + sorted_methods: + The input `methods` sorted with priority, highest priority first. + + Example + ------- + Assuming: Current platform is Linux. + + >>> methods = [LinuxA, LinuxB, LinuxC, MultiPlatformA, WindowsA] + >>> get_prioritized_methods( + >>> methods, + >>> methods_priority=[{"WinA", "LinuxB"}, "*"], + >>> ) + [LinuxB, WindowsA, LinuxA, LinuxC, MultiPlatformA] + + Explanation: + + WindowsA and LinuxB were given high priority, but since the current + platform is Linux, LinuxB was prioritized to be before WindowsA. + + The asterisk ('*') is converted to a set of rest of the methods: + {"LinuxA", "LinuxC", "MultiPlatformA"}, and those are then + automatically ordered. As all of them support Linux, the result is + just the methods sorted alphabetically. The asterisk in the end is + optional; it is added to the end of `methods_priority` if missing. + + """ + unordered_groups: List[Set[MethodCls]] = get_prioritized_methods_groups( + methods, methods_priority=methods_priority + ) + + ordered_groups: List[List[MethodCls]] = [ + sort_methods_by_priority(group) for group in unordered_groups + ] + + return [method for group in ordered_groups for method in group] + def handle_activation_fail(on_fail: OnFail, result: ActivationResult) -> None: if on_fail == "pass": diff --git a/src/wakepy/core/registry.py b/src/wakepy/core/registry.py index 5eb6466e..aea7b132 100644 --- a/src/wakepy/core/registry.py +++ b/src/wakepy/core/registry.py @@ -7,7 +7,7 @@ get_method Get a single method by name get_methods - Get multiple methods be name + Get multiple methods by name method_names_to_classes Convert multiple method names to Method classes get_methods_for_mode @@ -45,9 +45,17 @@ _method_registry: MethodRegistry = dict() -"""A name -> Method class mapping. Updated automatically; when python loads -a module with a subclass of Method, the Method class is added to this registry. +"""A registry of Methods and Modes. This is used for searching a Method base on +the name of the Method and optionally the name of the Mode the Method +implements. + +Updated automatically; when python loads a module with a subclass of Method, +the Method class is added to this registry. + +Data structure: The keys are names of Modes and values are MethodDicts. In +MethodDict, keys are names of methods, and values are Method classes. """ + logger = logging.getLogger(__name__) diff --git a/src/wakepy/methods/__init__.py b/src/wakepy/methods/__init__.py index 44ae5b78..caeea99e 100644 --- a/src/wakepy/methods/__init__.py +++ b/src/wakepy/methods/__init__.py @@ -26,6 +26,7 @@ # is that the Methods are registered into the method registry only if the class # definition is executed (if the module containing the Method class definition # is imported) +from . import _testing as _testing from . import freedesktop as freedesktop from . import gnome as gnome from . import macos as macos diff --git a/src/wakepy/methods/_testing.py b/src/wakepy/methods/_testing.py new file mode 100644 index 00000000..d4eab19b --- /dev/null +++ b/src/wakepy/methods/_testing.py @@ -0,0 +1,57 @@ +"""This module defines the WakepyFakeSuccess method, which can be used to fake +activation success. It is controlled with the WAKEPY_FAKE_SUCCESS environment +variable and meant to be used in CI pipelines / tests.""" + +from wakepy.core import CURRENT_PLATFORM, Method +from wakepy.core.constants import WAKEPY_FAKE_SUCCESS + + +class WakepyFakeSuccess(Method): + """This is a special fake method to be used with any mode. It can be used + in tests for faking wakepy mode activation. This way all IO and real + executable, library and dbus calls are prevented. To use this method (and + skip using any other methods), set WAKEPY_FAKE_SUCCESS environment variable + to a truthy value (e.g. "1", or "True"). + """ + + name = WAKEPY_FAKE_SUCCESS + mode = "_fake" + + environment_variable = name + + # All other values are considered to be truthy. Comparison is case + # insensitive + falsy_values = ("0", "no", "false") + + supported_platforms = (CURRENT_PLATFORM,) + + def enter_mode(self) -> None: + """Function which says if fake success should be enabled + + Fake success is controlled via WAKEPY_FAKE_SUCCESS environment + variable. If that variable is set to a truthy value,fake success is + activated. + + Falsy values: '0', 'no', 'false' (case ignored) + Truthy values: everything else + + Motivation: + ----------- + When running on CI system, wakepy might fail to acquire an inhibitor + lock just because there is no Desktop Environment running. In these + cases, it might be useful to just tell with an environment variable + that wakepy should fake the successful inhibition anyway. Faking the + success is done after every other method is tried (and failed). + """ + # The os.environ seems to be populated when os is imported -> delay the + # import until here. + import os + + if self.environment_variable not in os.environ: + raise RuntimeError(f"{self.environment_variable} not set.") + + val = os.environ[self.environment_variable] + if val.lower() in self.falsy_values: + raise RuntimeError( + f"{self.environment_variable} set to falsy value: {val}." + ) diff --git a/src/wakepy/modes/keep.py b/src/wakepy/modes/keep.py index 03c6d602..206afca6 100644 --- a/src/wakepy/modes/keep.py +++ b/src/wakepy/modes/keep.py @@ -8,10 +8,9 @@ if typing.TYPE_CHECKING: from typing import Optional, Type - from ..core.activation import MethodsPriorityOrder + from ..core.constants import StrCollection from ..core.dbus import DBusAdapter, DBusAdapterTypeSeq - from ..core.method import StrCollection - from ..core.mode import Mode, OnFail + from ..core.mode import MethodsPriorityOrder, Mode, OnFail def running( diff --git a/tests/conftest.py b/tests/conftest.py index 53724ebc..6ef29a03 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ import pytest +from wakepy import Method from wakepy.core.strenum import StrEnum if sys.version_info < (3, 8): # pragma: no-cover-if-py-gte-38 @@ -11,6 +12,16 @@ import typing +class TestMethod(Method): + __test__ = False # for pytest + mode = "_test" + + +@pytest.fixture +def method1(): + return TestMethod() + + @pytest.fixture def do_assert(): """Function to be used instead of assert statement.""" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index c97a21fa..63060ee1 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,6 +1,8 @@ import pytest from wakepy import DBusAdapter +from wakepy.core.registry import register_method +from wakepy.methods._testing import WakepyFakeSuccess class TestDBusAdapter(DBusAdapter): @@ -18,9 +20,15 @@ class TestUtils: """ @staticmethod - def empty_method_registry(monkeypatch): - # Make the method registry empty for duration of a test - monkeypatch.setattr("wakepy.core.registry._method_registry", dict()) + def empty_method_registry(monkeypatch, fake_success=False): + """ + Make the method registry empty for duration of a test. Optionally, keep + the WakepyFakeSuccess method in the registry. + """ + monkeypatch.setattr("wakepy.core.registry._method_registry", (dict())) + if fake_success: + register_method(WakepyFakeSuccess) + monkeypatch.setenv("WAKEPY_FAKE_SUCCESS", "yes") @pytest.fixture(scope="session") diff --git a/tests/unit/test_core/conftest.py b/tests/unit/test_core/conftest.py index 37855cd2..826cb504 100644 --- a/tests/unit/test_core/conftest.py +++ b/tests/unit/test_core/conftest.py @@ -1,6 +1,7 @@ import pytest from wakepy.core import DBusAddress, DBusMethod, Method, PlatformName +from wakepy.core.heartbeat import Heartbeat # B, D, E FIRST_MODE = "first_mode" @@ -13,6 +14,23 @@ class TestMethod(Method): mode = "_test" +@pytest.fixture +def heartbeat1(method1: Method): + """Well behaving Heartbeat instance""" + return Heartbeat(method1) + + +@pytest.fixture +def heartbeat2_bad(method1: Method): + """Bad Heartbeat instance. Returns a bad value.""" + + class BadHeartbeat(Heartbeat): + def stop(self): + return "Bad value" + + return BadHeartbeat(method1) + + @pytest.fixture(scope="function") def provide_methods_different_platforms(monkeypatch, testutils): testutils.empty_method_registry(monkeypatch) diff --git a/tests/unit/test_core/test_activation/test_activationresult.py b/tests/unit/test_core/test_activationresult.py similarity index 98% rename from tests/unit/test_core/test_activation/test_activationresult.py rename to tests/unit/test_core/test_activationresult.py index c29fd78b..9358c1a5 100644 --- a/tests/unit/test_core/test_activation/test_activationresult.py +++ b/tests/unit/test_core/test_activationresult.py @@ -6,11 +6,14 @@ import pytest from wakepy.core import ActivationResult, MethodActivationResult -from wakepy.core.activation import StageName, WakepyFakeSuccess +from wakepy.core.constants import StageName +from wakepy.core.registry import get_method if typing.TYPE_CHECKING: from typing import List +fake_success_cls = get_method("WAKEPY_FAKE_SUCCESS") + @pytest.fixture def mr_platform_support_fail() -> MethodActivationResult: @@ -53,7 +56,7 @@ def mr_wakepy_fake_notinuse() -> MethodActivationResult: return MethodActivationResult( success=False, failure_stage=StageName.ACTIVATION, - method_name=WakepyFakeSuccess.name, + method_name=fake_success_cls.name, ) @@ -62,7 +65,7 @@ def mr_wakepy_fake_success() -> MethodActivationResult: return MethodActivationResult( success=True, failure_stage=StageName.ACTIVATION, - method_name=WakepyFakeSuccess.name, + method_name=fake_success_cls.name, ) diff --git a/tests/unit/test_core/test_heartbeat.py b/tests/unit/test_core/test_heartbeat.py new file mode 100644 index 00000000..36b5b1a7 --- /dev/null +++ b/tests/unit/test_core/test_heartbeat.py @@ -0,0 +1,18 @@ +import pytest + +from wakepy import Method +from wakepy.core.heartbeat import Heartbeat + + +@pytest.fixture +def method0() -> Method: + return Method() + + +class TestHeartBeat: + + def test_heartbeat(self, method0): + + hb = Heartbeat(method0) + hb.start() + hb.stop() diff --git a/tests/unit/test_core/test_activation/__init__.py b/tests/unit/test_core/test_method/__init__.py similarity index 100% rename from tests/unit/test_core/test_activation/__init__.py rename to tests/unit/test_core/test_method/__init__.py diff --git a/tests/unit/test_core/test_activation/test_activation.py b/tests/unit/test_core/test_method/test_activation.py similarity index 80% rename from tests/unit/test_core/test_activation/test_activation.py rename to tests/unit/test_core/test_method/test_activation.py index 72d568e7..c729458e 100644 --- a/tests/unit/test_core/test_activation/test_activation.py +++ b/tests/unit/test_core/test_method/test_activation.py @@ -7,7 +7,6 @@ import os import re from contextlib import contextmanager -from unittest.mock import Mock import pytest import time_machine @@ -20,145 +19,19 @@ get_test_method_class, iterate_test_methods, ) -from wakepy.core import ( - DBusAdapter, - Method, - MethodActivationResult, - PlatformName, - get_methods, -) -from wakepy.core.activation import ( - StageName, - StageNameValue, - WakepyFakeSuccess, +from wakepy.core import Method, MethodActivationResult, PlatformName, get_methods +from wakepy.core.constants import WAKEPY_FAKE_SUCCESS, StageName, StageNameValue +from wakepy.core.heartbeat import Heartbeat +from wakepy.core.method import ( + MethodError, activate_method, - activate_mode, caniuse_fails, deactivate_method, get_platform_supported, try_enter_and_heartbeat, ) -from wakepy.core.heartbeat import Heartbeat -from wakepy.core.method import MethodError - - -@pytest.fixture -def heartbeat1(): - """Well behaving Heartbeat instance""" - heartbeat = Mock(spec_set=Heartbeat) - heartbeat.stop.return_value = True - return heartbeat - - -@pytest.fixture -def heartbeat2_bad(): - """Bad Heartbeat instance. Returns a bad value.""" - heartbeat = Mock(spec_set=Heartbeat) - heartbeat.stop.return_value = "Bad value" - return heartbeat - - -class TestActivateMode: - """tests for activate_mode""" - - @pytest.mark.usefixtures("mocks_for_test_activate_mode") - def test_activate_without_methods(self): - res, active_method, heartbeat = activate_mode([], None) - assert res.list_methods() == [] - assert res.success is False - assert active_method is None - assert heartbeat is None - - def test_activate_function_success(self, mocks_for_test_activate_mode): - """Here we test the activate_mode() function. It calls some - other functions which we do not care about as they're tested elsewhere. - That is we why monkeypatch those functions with fakes""" - - # Arrange - mocks = mocks_for_test_activate_mode - methodcls_fail = get_test_method_class(enter_mode=False) - methodcls_success = get_test_method_class(enter_mode=None) - - # Act - # Note: prioritize the failing first, so that the failing one will also - # be used. This also tests at that the prioritization is used at least - # somehow - result, active_method, heartbeat = activate_mode( - [methodcls_success, methodcls_fail], - dbus_adapter=mocks.dbus_adapter, - methods_priority=[ - methodcls_fail.name, - methodcls_success.name, - ], - ) - - # Assert - - # There is a successful method, so the activation succeeds. - assert result.success is True - - # The failing method is tried first because there is prioritization - # step which honors the `methods_priority` - assert [x.method_name for x in result.list_methods()] == [ - methodcls_fail.name, - methodcls_success.name, - ] - - assert isinstance(active_method, methodcls_success) - assert heartbeat is mocks.heartbeat - - def test_activate_function_failure(self, mocks_for_test_activate_mode): - # Arrange - mocks = mocks_for_test_activate_mode - methodcls_fail = get_test_method_class(enter_mode=False) - - # Act - result, active_method, heartbeat = activate_mode( - [methodcls_fail], - dbus_adapter=mocks.dbus_adapter, - ) - - # Assert - # The activation failed, so active_method and heartbeat is None - assert result.success is False - assert active_method is None - assert heartbeat is None - - @staticmethod - @pytest.fixture - def mocks_for_test_activate_mode(monkeypatch, heartbeat1): - """This is the test arrangement step for tests for the - `activate_mode` function""" - - mocks = Mock() - mocks.heartbeat = heartbeat1 - mocks.dbus_adapter = Mock(spec_set=DBusAdapter) - - def fake_activate_method(method): - try: - assert method.enter_mode() is None - success = True - except Exception: - success = False - - return ( - MethodActivationResult( - method_name=method.name, - success=True if success else False, - failure_stage=None if success else StageName.ACTIVATION, - ), - mocks.heartbeat, - ) - - monkeypatch.setattr( - "wakepy.core.activation.activate_method", fake_activate_method - ) - monkeypatch.setattr( - "wakepy.core.activation.check_methods_priority", - mocks.check_methods_priority, - ) - monkeypatch.setenv("WAKEPY_FAKE_SUCCESS", "0") - return mocks +from wakepy.core.platform import CURRENT_PLATFORM +from wakepy.core.registry import get_method class TestActivateMethod: @@ -177,19 +50,20 @@ def test_activate_method_method_without_name(self): ): activate_method(method) - def test_activate_method_method_without_platform_support(self, monkeypatch): - WindowsMethod = get_test_method_class( - supported_platforms=(PlatformName.WINDOWS,), + def test_activate_method_method_without_platform_support(self): + UnsupportedMethod = get_test_method_class( + supported_platforms=( + PlatformName.WINDOWS + if CURRENT_PLATFORM != PlatformName.WINDOWS + else PlatformName.LINUX + ), ) - winmethod = WindowsMethod() - monkeypatch.setattr( - "wakepy.core.activation.CURRENT_PLATFORM", PlatformName.LINUX - ) + unsupported_method = UnsupportedMethod() # The current platform is set to linux, so method supporting only linux # should fail. - res, heartbeat = activate_method(winmethod) + res, heartbeat = activate_method(unsupported_method) assert res.failure_stage == StageName.PLATFORM_SUPPORT assert res.success is False assert heartbeat is None @@ -637,6 +511,8 @@ def test_stagename(assert_strenum_values): class TestWakepyFakeSuccess: + wakepy_fake_success_cls = get_method(WAKEPY_FAKE_SUCCESS) + @contextmanager def wakepy_fake_value_set(self, monkeypatch, val): with monkeypatch.context() as mp: @@ -648,7 +524,7 @@ def wakepy_fake_value_set(self, monkeypatch, val): # These are the only "falsy" values for WAKEPY_FAKE_SUCCESS @pytest.mark.parametrize("val", ("0", "no", "NO", "False", "false", "FALSE")) def test_falsy_values(self, val, monkeypatch): - method = WakepyFakeSuccess() + method = self.wakepy_fake_success_cls() with self.wakepy_fake_value_set(monkeypatch, val), pytest.raises( RuntimeError, match=f"WAKEPY_FAKE_SUCCESS set to falsy value: {val}" @@ -657,13 +533,13 @@ def test_falsy_values(self, val, monkeypatch): @pytest.mark.parametrize("val", ("1", "yes", "True", "anystring")) def test_truthy_values(self, val, monkeypatch): - method = WakepyFakeSuccess() + method = self.wakepy_fake_success_cls() with self.wakepy_fake_value_set(monkeypatch, val): assert method.enter_mode() is None # type: ignore[func-returns-value] def test_without_the_env_var_set(self, monkeypatch): - method = WakepyFakeSuccess() + method = self.wakepy_fake_success_cls() if "WAKEPY_FAKE_SUCCESS" in os.environ: monkeypatch.delenv("WAKEPY_FAKE_SUCCESS") diff --git a/tests/unit/test_core/test_method.py b/tests/unit/test_core/test_method/test_method.py similarity index 73% rename from tests/unit/test_core/test_method.py rename to tests/unit/test_core/test_method/test_method.py index 1c228a92..3406dbba 100644 --- a/tests/unit/test_core/test_method.py +++ b/tests/unit/test_core/test_method/test_method.py @@ -6,9 +6,15 @@ import pytest from wakepy.core import DBusMethodCall -from wakepy.core.activation import has_enter, has_exit, has_heartbeat -from wakepy.core.method import Method, MethodOutcome, MethodOutcomeValue, select_methods -from wakepy.core.registry import MethodRegistryError, get_method, get_methods +from wakepy.core.method import ( + Method, + MethodOutcome, + MethodOutcomeValue, + has_enter, + has_exit, + has_heartbeat, +) +from wakepy.core.registry import MethodRegistryError, get_method if sys.version_info < (3, 8): # pragma: no-cover-if-py-gte-38 import typing_extensions as typing @@ -111,42 +117,6 @@ class SomeMethod(TestMethod): # type: ignore # noqa:F811 name = somename -@pytest.mark.usefixtures("provide_methods_a_f") -def test_select_methods(): - (MethodB, MethodD, MethodE) = get_methods(["B", "D", "E"]) - - methods = [MethodB, MethodD, MethodE] - - # These can also be filtered with a blacklist - assert select_methods(methods, omit=["B"]) == [MethodD, MethodE] - assert select_methods(methods, omit=["B", "E"]) == [MethodD] - # Extra 'omit' methods do not matter - assert select_methods(methods, omit=["B", "E", "foo", "bar"]) == [ - MethodD, - ] - - # These can be filtered with a whitelist - assert select_methods(methods, use_only=["B", "E"]) == [MethodB, MethodE] - - # If a whitelist contains extra methods, raise exception - with pytest.raises( - ValueError, - match=re.escape( - "Methods ['bar', 'foo'] in `use_only` are not part of `methods`!" - ), - ): - select_methods(methods, use_only=["foo", "bar"]) - - # Cannot provide both: omit and use_only - with pytest.raises( - ValueError, - match=re.escape( - "Can only define omit (blacklist) or use_only (whitelist), not both!" - ), - ): - select_methods(methods, use_only=["B"], omit=["E"]) - - def test_method_defaults(): """tests the Method enter_mode, exit_mode and heartbeat defaults""" m = Method() diff --git a/tests/unit/test_core/test_mode.py b/tests/unit/test_core/test_mode.py deleted file mode 100644 index e2641632..00000000 --- a/tests/unit/test_core/test_mode.py +++ /dev/null @@ -1,274 +0,0 @@ -from __future__ import annotations - -import re -import typing -import warnings -from unittest.mock import Mock, call - -import pytest - -from tests.unit.test_core.testmethods import get_test_method_class -from wakepy import ActivationError, ActivationResult -from wakepy.core.dbus import DBusAdapter -from wakepy.core.heartbeat import Heartbeat -from wakepy.core.mode import Mode, ModeController, ModeExit, handle_activation_fail - -if typing.TYPE_CHECKING: - from typing import Tuple, Type - - -def mocks_for_test_mode(): - # Setup test - mocks = Mock() - - mocks.dbus_adapter_cls = Mock(spec_set=type(DBusAdapter)) - mocks.dbus_adapter_cls.return_value = Mock(spec_set=DBusAdapter) - - mocks.mode_controller_cls = Mock() - mocks.mode_controller_cls.return_value = Mock(spec_set=ModeController) - - mocks.methods_priority = Mock() - - result = Mock(spec_set=ActivationResult) - methods = [Mock() for _ in range(3)] - - # Record calls in a "mock manager" - mocks.activation_result = result - mocks.methods = methods - return mocks - - -def get_mocks_and_testmode() -> Tuple[Mock, Type[Mode]]: - # Setup mocks - mocks = mocks_for_test_mode() - - class TestMode(Mode): - _controller_class = mocks.controller_class - - return mocks, TestMode - - -def test_mode_contextmanager_protocol(): - """Test that the Mode fulfills the context manager protocol; i.e. it is - possible to use instances of Mode in a with statement like this: - - with Mode() as m: - ... - - Test also that the ModeController.activate() and .deactivate() - are called as expected. and that the `m` is the return value of the - manager.activate() - """ - - # Setup mocks - mocks, TestMode = get_mocks_and_testmode() - - # starting point: No mock calls - assert mocks.mock_calls == [] - - mode = TestMode( - mocks.methods, - methods_priority=mocks.methods_priority, - dbus_adapter=mocks.dbus_adapter_cls, - name="TestMode", - ) - - # No calls during init - assert len(mocks.mock_calls) == 0 - - # Test that the context manager protocol works - with mode as m: - # The second call - # We have also called activate - assert len(mocks.mock_calls) == 3 - - # We have also created a ModeController instance - assert mocks.mock_calls[1] == call.controller_class( - dbus_adapter=mocks.dbus_adapter_cls.return_value - ) - # And called ModeController.activate - assert mocks.mock_calls[2] == call.controller_class().activate( - mocks.methods, methods_priority=mocks.methods_priority, modename="TestMode" - ) - # The __enter__ returns the Mode - assert m is mode - - # When activating, the .active is set to activation_result.success - assert m.active is m.activation_result.success - - # The m.activation_result contains the value from the - # ModeController.activate() call - assert ( - m.activation_result - == mocks.controller_class.return_value.activate.return_value - ) - - # After exiting the mode, Mode.active is set to False - assert m.active is False - - # If we get here, the __exit__ works without errors - # ModeController.deactivate() is called during __exit__ - assert len(mocks.mock_calls) == 4 - assert mocks.mock_calls[3] == call.controller_class().deactivate() - - -def test_mode_exits(): - mocks, TestMode = get_mocks_and_testmode() - - # Normal exit - with TestMode( - mocks.methods, - methods_priority=mocks.methods_priority, - dbus_adapter=mocks.dbus_adapter_cls, - ) as mode: - testval = 1 - - assert testval == 1 - - _assert_context_manager_used_correctly(mocks, mode) - - -def test_mode_exits_with_modeexit(): - mocks, TestMode = get_mocks_and_testmode() - - # Exit with ModeExit - with TestMode( - mocks.methods, - methods_priority=mocks.methods_priority, - dbus_adapter=mocks.dbus_adapter_cls, - ) as mode: - testval = 2 - raise ModeExit - testval = 0 # type: ignore # (never hit) - - assert testval == 2 - - _assert_context_manager_used_correctly(mocks, mode) - - -def test_mode_exits_with_modeexit_with_args(): - mocks, TestMode = get_mocks_and_testmode() - - # Exit with ModeExit with args - with TestMode( - mocks.methods, - methods_priority=mocks.methods_priority, - dbus_adapter=mocks.dbus_adapter_cls, - ) as mode: - testval = 3 - raise ModeExit("FOOO") - testval = 0 # type: ignore # (never hit) - - assert testval == 3 - - _assert_context_manager_used_correctly(mocks, mode) - - -def test_mode_exits_with_other_exception(): - mocks, TestMode = get_mocks_and_testmode() - - # Other exceptions are passed through - class MyException(Exception): ... - - with pytest.raises(MyException): - with TestMode( - mocks.methods, - methods_priority=mocks.methods_priority, - dbus_adapter=mocks.dbus_adapter_cls, - ) as mode: - testval = 4 - raise MyException - testval = 0 # type: ignore # (never hit) - - assert testval == 4 - - _assert_context_manager_used_correctly(mocks, mode) - - -def test_exiting_before_enter(): - - mocks, TestMode = get_mocks_and_testmode() - mode = TestMode( - mocks.methods, - methods_priority=mocks.methods_priority, - dbus_adapter=mocks.dbus_adapter_cls, - ) - with pytest.raises(RuntimeError, match="Must __enter__ before __exit__!"): - mode.__exit__(None, None, None) - - -def _assert_context_manager_used_correctly(mocks, mode): - assert mocks.mock_calls.copy() == [ - call.dbus_adapter_cls(), - call.controller_class(dbus_adapter=mocks.dbus_adapter_cls.return_value), - call.controller_class().activate( - mocks.methods, methods_priority=mocks.methods_priority, modename=mode.name - ), - call.controller_class().deactivate(), - ] - - -class TestHandleActivationFail: - """Tests for handle_activation_fail""" - - @staticmethod - @pytest.fixture - def result1(): - return ActivationResult(modename="testmode") - - @staticmethod - @pytest.fixture - def error_text_match(result1): - return re.escape(result1.get_error_text()) - - def test_pass(self, result1): - with warnings.catch_warnings(): - warnings.simplefilter("error") - handle_activation_fail(on_fail="pass", result=result1) - - def test_warn(self, result1, error_text_match): - with pytest.warns(UserWarning, match=error_text_match): - handle_activation_fail(on_fail="warn", result=result1) - - def test_error(self, result1, error_text_match): - with pytest.raises(ActivationError, match=error_text_match): - handle_activation_fail(on_fail="error", result=result1) - - def test_callable(self, result1): - mock = Mock() - with warnings.catch_warnings(): - warnings.simplefilter("error") - handle_activation_fail(on_fail=mock, result=result1) - mock.assert_called_once_with(result1) - - def test_bad_on_fail_value(self, result1): - with pytest.raises(ValueError, match="on_fail must be one of"): - handle_activation_fail( - on_fail="foo", # type: ignore - result=result1, - ) - - -def test_modecontroller(monkeypatch, do_assert): - # Disable fake success here, because we want to use method_cls for the - # activation (and not WakepyFakeSuccess) - monkeypatch.setenv("WAKEPY_FAKE_SUCCESS", "0") - - method_cls = get_test_method_class(enter_mode=None, heartbeat=None, exit_mode=None) - controller = ModeController(Mock(spec_set=DBusAdapter)) - - # When controller was created, it has not active method or heartbeat - do_assert(controller.active_method is None) - do_assert(controller.heartbeat is None) - - controller.activate([method_cls]) - do_assert(isinstance(controller.active_method, method_cls)) - do_assert(isinstance(controller.heartbeat, Heartbeat)) - - retval = controller.deactivate() - assert retval is True - assert controller.active_method is None - assert controller.heartbeat is None - - # Calling a deactivate for mode which is not activated will return False - assert controller.deactivate() is False diff --git a/tests/unit/test_core/test_mode/__init__.py b/tests/unit/test_core/test_mode/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/test_core/test_mode/test_mode.py b/tests/unit/test_core/test_mode/test_mode.py new file mode 100644 index 00000000..799bb183 --- /dev/null +++ b/tests/unit/test_core/test_mode/test_mode.py @@ -0,0 +1,379 @@ +from __future__ import annotations + +import copy +import re +import typing +import warnings +from unittest.mock import Mock + +import pytest + +from tests.unit.test_core.testmethods import get_test_method_class +from wakepy import ActivationError, ActivationResult, Method, Mode +from wakepy.core.activationresult import MethodActivationResult +from wakepy.core.constants import StageName +from wakepy.core.dbus import DBusAdapter +from wakepy.core.mode import ( + ModeExit, + activate_mode, + handle_activation_fail, + select_methods, +) +from wakepy.core.registry import get_methods + +if typing.TYPE_CHECKING: + from typing import List, Type + + +@pytest.fixture +def dbus_adapter_cls(): + class TestDbusAdapter(DBusAdapter): ... + + return TestDbusAdapter + + +@pytest.fixture +def testmode_cls(): + class TestMode(Mode): ... + + return TestMode + + +@pytest.fixture +def methods_abc(monkeypatch, testutils) -> List[Type[Method]]: + """This fixture creates three methods, which belong to a given mode.""" + testutils.empty_method_registry(monkeypatch, fake_success=True) + + class MethodA(Method): + name = "MethodA" + mode = "foo" + + class MethodB(Method): + name = "MethodB" + mode = "foo" + + class MethodC(Method): + name = "MethodC" + mode = "foo" + + return [MethodA, MethodB, MethodC] + + +@pytest.fixture +def methods_priority0(): + return ["*"] + + +@pytest.fixture +def mode0( + methods_abc: List[Type[Method]], + testmode_cls: Type[Mode], + methods_priority0: List[str], +): + return testmode_cls( + methods_abc, + dbus_adapter=None, + methods_priority=methods_priority0, + name="TestMode", + ) + + +@pytest.fixture +def mode1_with_dbus( + methods_abc: List[Type[Method]], + testmode_cls: Type[Mode], + methods_priority0: List[str], + dbus_adapter_cls: Type[DBusAdapter], +): + return testmode_cls( + methods_abc, + methods_priority=methods_priority0, + dbus_adapter=dbus_adapter_cls, + name="TestMode1", + ) + + +def test_mode_contextmanager_protocol( + mode0: Mode, +): + """Test that the Mode fulfills the context manager protocol""" + flag_end_of_with_block = False + + # Test that the context manager protocol works + with mode0 as m: + + # The __enter__ returns the Mode + assert m is mode0 + # We have activated the Mode + assert mode0.active + # There is an ActivationResult available in .activation_result + assert isinstance(m.activation_result, ActivationResult) + # The active method is also available + assert isinstance(mode0.active_method, Method) + + activation_result = copy.deepcopy(m.activation_result) + flag_end_of_with_block = True + + # reached the end of the with block + assert flag_end_of_with_block + + # After exiting the mode, Mode.active is set to False + assert m.active is False + # The active_method is set to None + assert m.active_method is None + # The activation result is still there (not removed during deactivation) + assert activation_result == m.activation_result + + +class TestModeActivateDeactivate: + """Tests for Mode._activate and Mode._deactivate""" + + def test_activate_twice( + self, + mode1_with_dbus: Mode, + ): + # Run twice. The dbus adapter instance is created on the first time + # and reused the second time. + with mode1_with_dbus: + ... + with mode1_with_dbus: + ... + + def test_runtime_error_if_deactivating_without_active_mode( + self, + mode0: Mode, + ): + # Try to deactivate a mode when there's no active_method. Needed for + # test coverage. A situation like this is unlikely to happen ever. + with pytest.raises( + RuntimeError, + match="Cannot deactivate mode: TestMode. The active_method is None! This should never happen", # noqa: E501 + ): + with mode0: + # Setting active method + mode0.active_method = None + + +class TestExitModeWithException: + """Test cases when a Mode is exited with an Exception""" + + def test_mode_exits_with_modeexit(self, mode0: Mode): + with mode0: + testval = 2 + raise ModeExit + testval = 0 # type: ignore # (never hit) + + assert testval == 2 + + def test_mode_exits_with_modeexit_with_args(self, mode0: Mode): + with mode0: + testval = 3 + raise ModeExit("FOOO") + testval = 0 # type: ignore # (never hit) + + assert testval == 3 + + def test_mode_exits_with_other_exception(self, mode0: Mode): + # Other exceptions are passed through + class MyException(Exception): ... + + with pytest.raises(MyException): + with mode0: + testval = 4 + raise MyException + testval = 0 # type: ignore # (never hit) + + assert testval == 4 + + +class TestHandleActivationFail: + """Tests for handle_activation_fail""" + + @staticmethod + @pytest.fixture + def result1(): + return ActivationResult(modename="testmode") + + @staticmethod + @pytest.fixture + def error_text_match(result1): + return re.escape(result1.get_error_text()) + + def test_pass(self, result1): + with warnings.catch_warnings(): + warnings.simplefilter("error") + handle_activation_fail(on_fail="pass", result=result1) + + def test_warn(self, result1, error_text_match): + with pytest.warns(UserWarning, match=error_text_match): + handle_activation_fail(on_fail="warn", result=result1) + + def test_error(self, result1, error_text_match): + with pytest.raises(ActivationError, match=error_text_match): + handle_activation_fail(on_fail="error", result=result1) + + def test_callable(self, result1): + mock = Mock() + with warnings.catch_warnings(): + warnings.simplefilter("error") + handle_activation_fail(on_fail=mock, result=result1) + mock.assert_called_once_with(result1) + + def test_bad_on_fail_value(self, result1): + with pytest.raises(ValueError, match="on_fail must be one of"): + handle_activation_fail( + on_fail="foo", # type: ignore + result=result1, + ) + + +@pytest.mark.usefixtures("provide_methods_a_f") +class TestSelectMethods: + + def test_filter_with_blacklist(self): + + (MethodB, MethodD, MethodE) = get_methods(["B", "D", "E"]) + methods = [MethodB, MethodD, MethodE] + assert select_methods(methods, omit=["B"]) == [MethodD, MethodE] + assert select_methods(methods, omit=["B", "E"]) == [MethodD] + + def test_extra_omit_does_not_matter(self): + (MethodB, MethodD, MethodE) = get_methods(["B", "D", "E"]) + methods = [MethodB, MethodD, MethodE] + # Extra 'omit' methods do not matter + assert select_methods(methods, omit=["B", "E", "foo", "bar"]) == [ + MethodD, + ] + + def test_filter_with_a_whitelist(self): + (MethodB, MethodD, MethodE) = get_methods(["B", "D", "E"]) + methods = [MethodB, MethodD, MethodE] + assert select_methods(methods, use_only=["B", "E"]) == [MethodB, MethodE] + + def test_whitelist_extras_causes_exception(self): + + (MethodB, MethodD, MethodE) = get_methods(["B", "D", "E"]) + methods = [MethodB, MethodD, MethodE] + + # If a whitelist contains extra methods, raise exception + with pytest.raises( + ValueError, + match=re.escape( + "Methods ['bar', 'foo'] in `use_only` are not part of `methods`!" + ), + ): + select_methods(methods, use_only=["foo", "bar"]) + + def test_cannot_provide_omit_and_use_only(self): + + (MethodB, MethodD, MethodE) = get_methods(["B", "D", "E"]) + methods = [MethodB, MethodD, MethodE] + # Cannot provide both: omit and use_only + with pytest.raises( + ValueError, + match=re.escape( + "Can only define omit (blacklist) or use_only (whitelist), not both!" + ), + ): + select_methods(methods, use_only=["B"], omit=["E"]) + + +class TestActivateMode: + """tests for activate_mode""" + + @pytest.mark.usefixtures("mocks_for_test_activate_mode") + def test_activate_without_methods(self): + res, active_method, heartbeat = activate_mode([], None) + assert res.list_methods() == [] + assert res.success is False + assert active_method is None + assert heartbeat is None + + def test_activate_function_success(self, mocks_for_test_activate_mode): + """Here we test the activate_mode() function. It calls some + other functions which we do not care about as they're tested elsewhere. + That is we why monkeypatch those functions with fakes""" + + # Arrange + mocks = mocks_for_test_activate_mode + methodcls_fail = get_test_method_class(enter_mode=False) + methodcls_success = get_test_method_class(enter_mode=None) + + # Act + # Note: prioritize the failing first, so that the failing one will also + # be used. This also tests at that the prioritization is used at least + # somehow + result, active_method, heartbeat = activate_mode( + [methodcls_success, methodcls_fail], + dbus_adapter=mocks.dbus_adapter, + methods_priority=[ + methodcls_fail.name, + methodcls_success.name, + ], + ) + + # Assert + + # There is a successful method, so the activation succeeds. + assert result.success is True + + # The failing method is tried first because there is prioritization + # step which honors the `methods_priority` + assert [x.method_name for x in result.list_methods()] == [ + methodcls_fail.name, + methodcls_success.name, + ] + + assert isinstance(active_method, methodcls_success) + assert heartbeat is mocks.heartbeat + + def test_activate_function_failure(self, mocks_for_test_activate_mode): + # Arrange + mocks = mocks_for_test_activate_mode + methodcls_fail = get_test_method_class(enter_mode=False) + + # Act + result, active_method, heartbeat = activate_mode( + [methodcls_fail], + dbus_adapter=mocks.dbus_adapter, + ) + + # Assert + # The activation failed, so active_method and heartbeat is None + assert result.success is False + assert active_method is None + assert heartbeat is None + + @staticmethod + @pytest.fixture + def mocks_for_test_activate_mode(monkeypatch, heartbeat1): + """This is the test arrangement step for tests for the + `activate_mode` function""" + + mocks = Mock() + mocks.heartbeat = heartbeat1 + mocks.dbus_adapter = Mock(spec_set=DBusAdapter) + + def fake_activate_method(method): + try: + assert method.enter_mode() is None + success = True + except Exception: + success = False + + return ( + MethodActivationResult( + method_name=method.name, + success=True if success else False, + failure_stage=None if success else StageName.ACTIVATION, + ), + mocks.heartbeat, + ) + + monkeypatch.setattr("wakepy.core.mode.activate_method", fake_activate_method) + monkeypatch.setattr( + "wakepy.core.mode.check_methods_priority", + mocks.check_methods_priority, + ) + monkeypatch.setenv("WAKEPY_FAKE_SUCCESS", "0") + return mocks diff --git a/tests/unit/test_core/test_activation/test_prioritization.py b/tests/unit/test_core/test_mode/test_prioritization.py similarity index 95% rename from tests/unit/test_core/test_activation/test_prioritization.py rename to tests/unit/test_core/test_mode/test_prioritization.py index 1966d5d7..ebdf136f 100644 --- a/tests/unit/test_core/test_activation/test_prioritization.py +++ b/tests/unit/test_core/test_mode/test_prioritization.py @@ -3,7 +3,7 @@ import pytest from wakepy.core import PlatformName -from wakepy.core.activation import ( +from wakepy.core.mode import ( check_methods_priority, get_prioritized_methods, get_prioritized_methods_groups, @@ -164,13 +164,13 @@ def test_sort_methods_by_priority(monkeypatch): ["WinA", "WinB", "WinC", "LinuxA", "LinuxB", "LinuxC", "multiA"] ) - monkeypatch.setattr("wakepy.core.activation.CURRENT_PLATFORM", PlatformName.LINUX) + monkeypatch.setattr("wakepy.core.mode.CURRENT_PLATFORM", PlatformName.LINUX) # Expecting to see Linux methods prioritized, and then by method name assert sort_methods_by_priority( {WindowsA, WindowsB, WindowsC, LinuxA, LinuxB, LinuxC, MultiPlatformA} ) == [LinuxA, LinuxB, LinuxC, MultiPlatformA, WindowsA, WindowsB, WindowsC] - monkeypatch.setattr("wakepy.core.activation.CURRENT_PLATFORM", PlatformName.WINDOWS) + monkeypatch.setattr("wakepy.core.mode.CURRENT_PLATFORM", PlatformName.WINDOWS) # Expecting to see windows methods prioritized, and then by method name assert sort_methods_by_priority( {WindowsA, WindowsB, WindowsC, LinuxA, LinuxB, LinuxC, MultiPlatformA} @@ -183,7 +183,7 @@ def test_get_prioritized_methods(monkeypatch): ["WinA", "WinB", "WinC", "LinuxA", "LinuxB", "LinuxC", "multiA"] ) - monkeypatch.setattr("wakepy.core.activation.CURRENT_PLATFORM", PlatformName.LINUX) + monkeypatch.setattr("wakepy.core.mode.CURRENT_PLATFORM", PlatformName.LINUX) assert get_prioritized_methods( [ @@ -239,7 +239,7 @@ def test_get_prioritized_methods(monkeypatch): ], ) == [LinuxA, LinuxB, LinuxC, MultiPlatformA, WindowsA, WindowsB] - monkeypatch.setattr("wakepy.core.activation.CURRENT_PLATFORM", PlatformName.WINDOWS) + monkeypatch.setattr("wakepy.core.mode.CURRENT_PLATFORM", PlatformName.WINDOWS) # No user-defined order -> Just alphabetical, but current platform # (Windows) first. assert get_prioritized_methods( diff --git a/tests/unit/test_modes.py b/tests/unit/test_modes.py index 99818951..38686e09 100644 --- a/tests/unit/test_modes.py +++ b/tests/unit/test_modes.py @@ -8,66 +8,94 @@ @pytest.mark.parametrize( - "input_args", + "name_prefix, function_under_test, modename", [ - dict( - name_prefix="running", - function_under_test=keep.running, - modename=ModeName.KEEP_RUNNING, + ( + "running", + keep.running, + ModeName.KEEP_RUNNING, ), - dict( - name_prefix="presenting", - function_under_test=keep.presenting, - modename=ModeName.KEEP_PRESENTING, + ( + "presenting", + keep.presenting, + ModeName.KEEP_PRESENTING, ), ], ) -def test_keep_running_mode_creation(input_args, monkeypatch, testutils): - """Simple test for keep.running and keep.presenting. Tests that all input - arguments for the functions are passed to the Mode.__init__ - """ +class TestKeepRunninAndPresenting: + """Tests common for keep.running and keep.presenting functions. The + `function_under_test` is either the keep.running or keep.presenting + function""" - testutils.empty_method_registry(monkeypatch) - - name_prefix = input_args["name_prefix"] - function_under_test = input_args["function_under_test"] - modename = input_args["modename"] - - class MethodA(Method): - name = f"{name_prefix}A" - mode = modename - - class MethodB(Method): - name = f"{name_prefix}B" - mode = modename - - class MethodC(Method): - name = f"{name_prefix}C" - mode = modename - - mode = function_under_test() - # All the methods for the mode are selected automatically - assert set(mode.method_classes) == {MethodA, MethodB, MethodC} - - # Case: Test "omit" parameter - mode = function_under_test(omit=[f"{name_prefix}A"]) - assert set(mode.method_classes) == {MethodB, MethodC} - - # Case: Test "methods" parameter - mode = function_under_test(methods=[f"{name_prefix}A", f"{name_prefix}B"]) - assert set(mode.method_classes) == {MethodB, MethodA} - - # Case: Test "methods_priority" parameter - methods_priority = [f"{name_prefix}A", f"{name_prefix}B"] - mode = function_under_test(methods_priority=methods_priority) - assert mode.methods_priority == methods_priority - assert set(mode.method_classes) == {MethodB, MethodA, MethodC} - - # Case: Test "dbus_adapter" parameter - class MyDBusAdapter(DBusAdapter): ... - - mode = function_under_test(dbus_adapter=MyDBusAdapter) - assert mode._dbus_adapter_cls == MyDBusAdapter + @staticmethod + @pytest.fixture + def methods(name_prefix, modename, monkeypatch, testutils): + """This fixture creates three methods, which belong to a given mode.""" + + testutils.empty_method_registry(monkeypatch) + + class MethodA(Method): + name = f"{name_prefix}A" + mode = modename + + class MethodB(Method): + name = f"{name_prefix}B" + mode = modename + + class MethodC(Method): + name = f"{name_prefix}C" + mode = modename + + return dict( + MethodA=MethodA, + MethodB=MethodB, + MethodC=MethodC, + ) + + def test_all_modes_are_selected_automatically(self, function_under_test, methods): + """Simple test for keep.running and keep.presenting. Tests that all + input arguments for the functions are passed to the Mode.__init__ + """ + + mode = function_under_test() + # All the methods for the mode are selected automatically + assert set(mode.method_classes) == { + methods["MethodA"], + methods["MethodB"], + methods["MethodC"], + } + + def test_omit_parameter(self, name_prefix, function_under_test, methods): + # Case: Test "omit" parameter + mode = function_under_test(omit=[f"{name_prefix}A"]) + assert set(mode.method_classes) == {methods["MethodB"], methods["MethodC"]} + + def test_methods_parameter(self, name_prefix, function_under_test, methods): + # Case: Test "methods" parameter + mode = function_under_test(methods=[f"{name_prefix}A", f"{name_prefix}B"]) + assert set(mode.method_classes) == {methods["MethodA"], methods["MethodB"]} + + def test_methods_priority_parameter( + self, name_prefix, function_under_test, methods + ): + # Case: Test "methods_priority" parameter + methods_priority = [f"{name_prefix}A", f"{name_prefix}B"] + mode = function_under_test(methods_priority=methods_priority) + assert mode.methods_priority == methods_priority + assert set(mode.method_classes) == { + methods["MethodA"], + methods["MethodB"], + methods["MethodC"], + } + + def test_methods_dbus_adapter_parameter(self, function_under_test, methods): + # Case: Test "dbus_adapter" parameter + class MyDBusAdapter(DBusAdapter): ... + + mode = function_under_test( + dbus_adapter=MyDBusAdapter, methods=[x.name for x in methods.values()] + ) + assert mode._dbus_adapter_cls == MyDBusAdapter def test_keep_running_with_fake_success(monkeypatch, fake_dbus_adapter):