-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Project cleanup, added Requirement.py
- Removed Math.py and tests - Added Requirement.py and tests
- Loading branch information
1 parent
300ff6c
commit 1dd988b
Showing
10 changed files
with
364 additions
and
149 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
# ------------------------------------------------------------------------------- | ||
# | | | ||
# | Copyright (c) 2024 Scientific Software Engineering Center at Georgia Tech | | ||
# | Distributed under the MIT License. | | ||
# | | | ||
# ------------------------------------------------------------------------------- | ||
"""Contains functionality to process items in parallel and/or sequentially.""" | ||
|
||
from enum import auto, Enum | ||
from typing import Callable, cast, Optional, TypeVar | ||
|
||
from dbrownell_Common import ExecuteTasks # type: ignore[import-untyped] | ||
from dbrownell_Common.Streams.DoneManager import DoneManager # type: ignore[import-untyped] | ||
from dbrownell_Common.Streams.StreamDecorator import StreamDecorator # type: ignore[import-untyped] | ||
|
||
|
||
# ---------------------------------------------------------------------- | ||
class ExecutionStyle(Enum): | ||
"""Controls the way in which a Requirement, Query, and Module can be processed.""" | ||
|
||
Sequential = auto() | ||
Parallel = auto() | ||
|
||
|
||
# ---------------------------------------------------------------------- | ||
ItemType = TypeVar("ItemType") | ||
OutputType = TypeVar("OutputType") | ||
|
||
|
||
def ParallelSequentialProcessor( | ||
items: list[ItemType], | ||
calculate_result_func: Callable[[ItemType], tuple[int, OutputType]], | ||
dm: Optional[DoneManager] = None, | ||
*, | ||
max_num_threads: Optional[int] = None, | ||
) -> list[OutputType]: | ||
if dm is None: | ||
with DoneManager.Create(StreamDecorator(None), "", line_prefix="") as dm: | ||
return _Impl( | ||
dm, | ||
items, | ||
calculate_result_func, | ||
max_num_threads, | ||
) | ||
|
||
return _Impl( | ||
dm, | ||
items, | ||
calculate_result_func, | ||
max_num_threads, | ||
) | ||
|
||
|
||
# ---------------------------------------------------------------------- | ||
# ---------------------------------------------------------------------- | ||
# ---------------------------------------------------------------------- | ||
def _Impl( | ||
dm: DoneManager, | ||
items: list[ItemType], | ||
calculate_result_func: Callable[[ItemType], tuple[int, OutputType]], | ||
max_num_threads: Optional[int], | ||
) -> list[OutputType]: | ||
# Divide the items into those that can be run in parallel and those that must be run sequentially | ||
parallel: list[tuple[int, ItemType]] = [] | ||
sequential: list[tuple[int, ItemType]] = [] | ||
|
||
for index, item in enumerate(items): | ||
execution_style = item.style # type: ignore | ||
|
||
if execution_style == ExecutionStyle.Parallel: | ||
parallel.append((index, item)) | ||
elif execution_style == ExecutionStyle.Sequential: | ||
sequential.append((index, item)) | ||
else: | ||
assert False, execution_style # pragma: no cover | ||
|
||
if len(parallel) == 1: | ||
sequential.append(parallel[0]) | ||
parallel = [] | ||
|
||
# Calculate the results | ||
results: list[Optional[OutputType]] = [None] * len(items) | ||
|
||
# ---------------------------------------------------------------------- | ||
def Execute( | ||
results_index: int, | ||
item: ItemType, | ||
) -> ExecuteTasks.TransformResultComplete: | ||
return_code, result = calculate_result_func(item) | ||
|
||
assert results[results_index] is None | ||
results[results_index] = result | ||
|
||
return ExecuteTasks.TransformResultComplete(None, return_code) | ||
|
||
# ---------------------------------------------------------------------- | ||
|
||
if parallel: | ||
ExecuteTasks.TransformTasks( | ||
dm, | ||
"Processing", | ||
[ | ||
ExecuteTasks.TaskData(item.name, (results_index, item)) # type: ignore | ||
for results_index, item in parallel | ||
], | ||
lambda context, status: Execute(*context), | ||
max_num_threads=max_num_threads, | ||
) | ||
|
||
for sequential_index, (results_index, item) in enumerate(sequential): | ||
with dm.Nested( | ||
"Processing '{}' ({} of {})...".format( | ||
item.name, # type: ignore | ||
sequential_index + 1 + len(parallel), | ||
len(items), | ||
), | ||
): | ||
Execute(results_index, item) | ||
|
||
assert not any(result is None for result in results), results | ||
return cast(list[OutputType], results) |
Empty file.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
# ------------------------------------------------------------------------------- | ||
# | | | ||
# | Copyright (c) 2024 Scientific Software Engineering Center at Georgia Tech | | ||
# | Distributed under the MIT License. | | ||
# | | | ||
# ------------------------------------------------------------------------------- | ||
"""Contains types used when creating Requirements.""" | ||
|
||
from abc import ABC, abstractmethod | ||
from dataclasses import dataclass | ||
from enum import auto, Enum | ||
from typing import Any, Optional | ||
|
||
|
||
from .Impl.ParallelSequentialProcessor import ExecutionStyle | ||
|
||
|
||
# ---------------------------------------------------------------------- | ||
class EvaluateResult(Enum): | ||
"""Result of evaluating a Requirement against a set of data.""" | ||
|
||
DoesNotApply = auto() | ||
Success = auto() | ||
Warning = auto() | ||
Error = auto() | ||
|
||
|
||
# ---------------------------------------------------------------------- | ||
@dataclass(frozen=True) | ||
class Requirement(ABC): | ||
"""A single requirement that can be evaluated against a set of data.""" | ||
|
||
# ---------------------------------------------------------------------- | ||
# | | ||
# | Public Types | ||
# | | ||
# ---------------------------------------------------------------------- | ||
@dataclass(frozen=True) | ||
class EvaluateInfo: | ||
"""Information associated with evaluating a Requirement against a set of data.""" | ||
|
||
result: EvaluateResult | ||
context: Optional[str] | ||
|
||
resolution: Optional[str] | ||
rationale: Optional[str] | ||
|
||
requirement: "Requirement" | ||
|
||
# ---------------------------------------------------------------------- | ||
# | | ||
# | Public Data | ||
# | | ||
# ---------------------------------------------------------------------- | ||
name: str | ||
description: str | ||
style: ExecutionStyle | ||
|
||
resolution_template: str | ||
rationale_template: str | ||
|
||
# ---------------------------------------------------------------------- | ||
# | | ||
# | Public Methods | ||
# | | ||
# ---------------------------------------------------------------------- | ||
def Evaluate( | ||
self, | ||
query_data: dict[str, Any], | ||
) -> "Requirement.EvaluateInfo": | ||
result, context = self._EvaluateImpl(query_data) | ||
|
||
if result in [EvaluateResult.DoesNotApply, EvaluateResult.Success]: | ||
return Requirement.EvaluateInfo(result, context, None, None, self) | ||
|
||
return Requirement.EvaluateInfo( | ||
result, | ||
context, | ||
self.resolution_template.format(**query_data), | ||
self.rationale_template.format(**query_data), | ||
self, | ||
) | ||
|
||
# ---------------------------------------------------------------------- | ||
# | | ||
# | Private Methods | ||
# | | ||
# ---------------------------------------------------------------------- | ||
@abstractmethod | ||
def _EvaluateImpl( | ||
self, | ||
query_data: dict[str, Any], | ||
) -> tuple[EvaluateResult, Optional[str]]: | ||
"""Perform the actual evaluation""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.