Skip to content

Commit

Permalink
Move retry logic to lithops (#673)
Browse files Browse the repository at this point in the history
Lithops doesn't have group names so use a dict to track
  • Loading branch information
tomwhite authored Jan 17, 2025
1 parent c487014 commit e7cf014
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 205 deletions.
22 changes: 10 additions & 12 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
)

from lithops.executors import FunctionExecutor
from lithops.retries import RetryingFunctionExecutor, RetryingFuture
from lithops.wait import ALWAYS, ANY_COMPLETED
from networkx import MultiDiGraph

from cubed.runtime.backup import should_launch_backup, use_backups_default
from cubed.runtime.executors.lithops_retries import (
RetryingFunctionExecutor,
RetryingFuture,
)
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import (
Expand Down Expand Up @@ -79,6 +76,7 @@ def map_unordered(
return_when = ALWAYS if use_backups else ANY_COMPLETED
wait_dur_sec = wait_dur_sec or 1

future_to_group_name: Dict[str, str] = {}
group_name_to_function: Dict[str, Callable[..., Any]] = {}
# backups are launched based on task start and end times for the group
start_times: Dict[str, Dict[RetryingFuture, float]] = {}
Expand All @@ -97,13 +95,13 @@ def map_unordered(

futures = lithops_function_executor.map(
partial_map_function,
map_iterdata,
list(map_iterdata), # lithops requires a list
timeout=timeout,
include_modules=include_modules,
retries=retries,
group_name=group_name,
)
start_times[group_name] = {k: time.monotonic() for k in futures}
future_to_group_name.update({k: group_name for k in futures})
pending.extend(futures)

while pending:
Expand All @@ -122,10 +120,10 @@ def map_unordered(
if not backup.done or not backup.error:
continue
future.status(throw_except=True)
group_name = future.group_name # type: ignore[assignment]
group_name = future_to_group_name[future] # type: ignore[assignment]
end_times[group_name][future] = time.monotonic()
if return_stats:
yield future.result(), standardise_lithops_stats(future)
yield future.result(), standardise_lithops_stats(group_name, future)
else:
yield future.result()

Expand All @@ -142,7 +140,7 @@ def map_unordered(
if use_backups:
now = time.monotonic()
for future in copy.copy(pending):
group_name = future.group_name # type: ignore[assignment]
group_name = future_to_group_name[future] # type: ignore[assignment]
if future not in backups and should_launch_backup(
future, now, start_times[group_name], end_times[group_name]
):
Expand All @@ -154,11 +152,11 @@ def map_unordered(
timeout=timeout,
include_modules=include_modules,
retries=0, # don't retry backup tasks
group_name=group_name,
)
start_times[group_name].update(
{k: time.monotonic() for k in futures}
)
future_to_group_name.update({k: group_name for k in futures})
pending.extend(futures)
backup = futures[0]
backups[future] = backup
Expand Down Expand Up @@ -237,10 +235,10 @@ def execute_dag(
handle_callbacks(callbacks, result, stats)


def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
def standardise_lithops_stats(name: str, future: RetryingFuture) -> Dict[str, Any]:
stats = future.stats
return dict(
name=future.group_name,
name=name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
Expand Down
187 changes: 0 additions & 187 deletions cubed/runtime/executors/lithops_retries.py

This file was deleted.

2 changes: 1 addition & 1 deletion cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
pytest.importorskip("lithops")

from lithops.executors import LocalhostExecutor
from lithops.retries import RetryingFunctionExecutor

from cubed.runtime.executors.lithops import map_unordered
from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor


def run_test(function, input, retries, timeout=10, use_backups=False):
Expand Down
2 changes: 1 addition & 1 deletion cubed/tests/runtime/test_lithops_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
pytest.importorskip("lithops")

from lithops.executors import LocalhostExecutor
from lithops.retries import RetryingFunctionExecutor

from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor
from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure


Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# cubed
apache-beam
fsspec
lithops[aws] >= 2.7.0
lithops[aws] >= 3.3.0
modal
mypy_extensions # for rechunker
networkx != 2.8.3, != 2.8.4, != 2.8.5, != 2.8.6, != 2.8.7, != 2.8.8, != 3.0.*, != 3.1.*, != 3.2.*
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ beam = ["apache-beam", "gcsfs"]
dask = ["dask < 2024.12.0"]
dask-distributed = ["distributed < 2024.12.0"]
icechunk = ["icechunk"]
lithops = ["lithops[aws] >= 2.7.0"]
lithops = ["lithops[aws] >= 3.3.0"]
lithops-aws = [
"cubed[diagnostics]",
"lithops[aws]",
"lithops[aws] >= 3.3.0",
"s3fs",
]
lithops-gcp = [
"cubed[diagnostics]",
"lithops[gcp]",
"lithops[gcp] >= 3.3.0",
"gcsfs",
]
modal = [
Expand Down

0 comments on commit e7cf014

Please sign in to comment.