Skip to content

Commit

Permalink
Always set region for Modal executor
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Feb 20, 2025
1 parent 6724df2 commit 60a5562
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 125 deletions.
183 changes: 101 additions & 82 deletions cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
from asyncio.exceptions import TimeoutError
from typing import Any, Callable, Optional, Sequence
Expand All @@ -8,97 +9,114 @@
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed import config
from cubed.runtime.asyncio import async_map_dag
from cubed.runtime.backup import use_backups_default
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import asyncio_run, execute_with_stats
from cubed.spec import Spec

RUNTIME_MEMORY_MIB = 2000
from cubed.spec import Spec, spec_from_config
from cubed.utils import convert_to_bytes

app = modal.App("cubed-app", include_source=True)

requirements_file = os.getenv("CUBED_MODAL_REQUIREMENTS_FILE")
# Get Modal App settings from Cubed configuration. Note that we have to do this
# globally, since Modal remote functions have to be defined globally.
if modal.is_local():
spec = spec_from_config(config)
executor_options = spec.executor_options
else:
executor_options = {}

runtime_memory = convert_to_bytes(executor_options.get("memory", "2GB"))
runtime_memory_mib = runtime_memory // 1_000_000
retries = executor_options.get("retries", 2)
timeout = executor_options.get("timeout", 180)
cloud = executor_options.get("cloud", "aws")
region = executor_options.get("region", None)
if modal.is_local() and region is None:
raise ValueError(
"Must set region when running using Modal, via the Cubed 'spec.executor_options.region' configuration setting."
)

requirements_file = executor_options.get("requirements_file", None)

if requirements_file:
image = modal.Image.debian_slim().pip_install_from_requirements(requirements_file)
aws_image = image
gcp_image = image
else:
aws_image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"ndindex",
"networkx",
"psutil",
"pytest-mock", # TODO: only needed for tests
"s3fs",
"tenacity",
"toolz",
"zarr",
]
)
gcp_image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"ndindex",
"networkx",
"psutil",
"pytest-mock", # TODO: only needed for tests
"gcsfs",
"tenacity",
"toolz",
"zarr",
]
)
if cloud == "aws":
image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"ndindex",
"networkx",
"psutil",
"pytest-mock", # TODO: only needed for tests
"s3fs",
"tenacity",
"toolz",
"zarr",
]
)
elif cloud == "gcp":
image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"ndindex",
"networkx",
"psutil",
"pytest-mock", # TODO: only needed for tests
"gcsfs",
"tenacity",
"toolz",
"zarr",
]
)
else:
raise ValueError(f"Unrecognized cloud: {cloud}")

if cloud == "aws":
secrets = [modal.Secret.from_name("my-aws-secret")]
elif cloud == "gcp":
secrets = [modal.Secret.from_name("my-googlecloud-secret")]
else:
raise ValueError(f"Unrecognized cloud: {cloud}")


def check_runtime_memory(spec):
allowed_mem = spec.allowed_mem if spec is not None else None
runtime_memory = RUNTIME_MEMORY_MIB * 1024 * 1024
if allowed_mem is not None:
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)


@app.function(
image=aws_image,
secrets=[modal.Secret.from_name("my-aws-secret")],
memory=RUNTIME_MEMORY_MIB,
retries=2,
cloud="aws",
)
def run_remotely(input, func=None, config=None, name=None, compute_id=None):
print(f"running remotely on {input} in {os.getenv('MODAL_REGION')}")
# note we can't use the execution_stat decorator since it doesn't work with modal decorators
result, stats = execute_with_stats(func, input, config=config)
return result, stats


# For GCP we need to use a class so we can set up credentials by hooking into the container lifecycle
# We use a class so for GCP we can set up credentials by hooking into the container lifecycle
@app.cls(
image=gcp_image,
secrets=[modal.Secret.from_name("my-googlecloud-secret")],
memory=RUNTIME_MEMORY_MIB,
retries=2,
cloud="gcp",
image=image,
secrets=secrets,
cpu=1.0,
memory=runtime_memory_mib, # modal memory is in MiB
retries=retries,
timeout=timeout,
cloud=cloud,
region=region,
)
class Container:
@modal.enter()
def set_up_credentials(self):
json = os.environ["SERVICE_ACCOUNT_JSON"]
path = os.path.abspath("application_credentials.json")
with open(path, "w") as f:
f.write(json)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path
if os.getenv("MODAL_CLOUD_PROVIDER") == "CLOUD_PROVIDER_GCP":
json = os.environ["SERVICE_ACCOUNT_JSON"]
path = os.path.abspath("application_credentials.json")
with open(path, "w") as f:
f.write(json)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path

@modal.method()
def run_remotely(self, input, func=None, config=None, name=None, compute_id=None):
Expand Down Expand Up @@ -137,6 +155,9 @@ def execute_dag(
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
# remove executor options as they should already have been used in defining the remote functions
for executor_option in ("memory", "retries", "timeout", "cloud", "region"):
merged_kwargs.pop(executor_option, None)
asyncio_run(
self._async_execute_dag(
dag,
Expand Down Expand Up @@ -169,21 +190,19 @@ async def _async_execute_dag(
if "use_backups" not in kwargs and use_backups_default(spec):
kwargs["use_backups"] = True

cloud = cloud or "aws"
if cloud == "aws":
function = run_remotely
elif cloud == "gcp":
function = Container().run_remotely
else:
raise ValueError(f"Unrecognized cloud: {cloud}")

async with app.run():
create_futures_func = modal_create_futures_func(function)
await async_map_dag(
create_futures_func,
dag=dag,
callbacks=callbacks,
resume=resume,
compute_arrays_in_parallel=compute_arrays_in_parallel,
**kwargs,
)
function = Container().run_remotely

enable_output = kwargs.pop("enable_output", False)
cm = modal.enable_output() if enable_output else contextlib.nullcontext()

with cm:
async with app.run():
create_futures_func = modal_create_futures_func(function)
await async_map_dag(
create_futures_func,
dag=dag,
callbacks=callbacks,
resume=resume,
compute_arrays_in_parallel=compute_arrays_in_parallel,
**kwargs,
)
33 changes: 21 additions & 12 deletions cubed/tests/runtime/test_modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure

tmp_path = "s3://cubed-unittest/map_unordered"

region = "us-east-1" # S3 region for above bucket

app = modal.App("cubed-test-app", include_source=True)

Expand All @@ -40,13 +40,19 @@
secrets=[modal.Secret.from_name("my-aws-secret")],
retries=2,
timeout=10,
cloud="aws",
region=region,
)
def deterministic_failure_modal(i, path=None, timing_map=None, *, name=None):
return deterministic_failure(path, timing_map, i, name=name)


@app.function(
image=image, secrets=[modal.Secret.from_name("my-aws-secret")], timeout=10
image=image,
secrets=[modal.Secret.from_name("my-aws-secret")],
timeout=10,
cloud="aws",
region=region,
)
def deterministic_failure_modal_no_retries(i, path=None, timing_map=None, *, name=None):
return deterministic_failure(path, timing_map, i, name=name)
Expand All @@ -57,6 +63,8 @@ def deterministic_failure_modal_no_retries(i, path=None, timing_map=None, *, nam
secrets=[modal.Secret.from_name("my-aws-secret")],
retries=2,
timeout=300,
cloud="aws",
region=region,
)
def deterministic_failure_modal_long_timeout(
i, path=None, timing_map=None, *, name=None
Expand All @@ -66,16 +74,17 @@ def deterministic_failure_modal_long_timeout(

async def run_test(app_function, input, use_backups=False, batch_size=None, **kwargs):
outputs = set()
async with app.run():
create_futures_func = modal_create_futures_func(app_function)
async for output in async_map_unordered(
create_futures_func,
input,
use_backups=use_backups,
batch_size=batch_size,
**kwargs,
):
outputs.add(output)
with modal.enable_output():
async with app.run():
create_futures_func = modal_create_futures_func(app_function)
async for output in async_map_unordered(
create_futures_func,
input,
use_backups=use_backups,
batch_size=batch_size,
**kwargs,
):
outputs.add(output)
return outputs


Expand Down
17 changes: 1 addition & 16 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@
from cubed.core.ops import general_blockwise, merge_chunks, partial_reduce, tree_reduce
from cubed.core.optimization import fuse_all_optimize_dag, multiple_inputs_optimize_dag
from cubed.storage.backend import open_backend_array
from cubed.tests.utils import (
ALL_EXECUTORS,
MAIN_EXECUTORS,
MODAL_EXECUTORS,
TaskCounter,
create_zarr,
)
from cubed.tests.utils import ALL_EXECUTORS, MAIN_EXECUTORS, TaskCounter, create_zarr


@pytest.fixture()
Expand All @@ -48,15 +42,6 @@ def any_executor(request):
return request.param


@pytest.fixture(
scope="module",
params=MODAL_EXECUTORS,
ids=[executor.name for executor in MODAL_EXECUTORS],
)
def modal_executor(request):
return request.param


def test_as_array_fails(spec):
a = np.ones((1000, 1000))
with pytest.raises(
Expand Down
2 changes: 1 addition & 1 deletion cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def test_check_runtime_memory_modal(spec, modal_executor):
c = xp.add(a, b)
with pytest.raises(
ValueError,
match=r"Runtime memory \(2097152000\) is less than allowed_mem \(4000000000\)",
match=r"Runtime memory \(2000000000\) is less than allowed_mem \(4000000000\)",
):
c.compute(executor=modal_executor)

Expand Down
16 changes: 15 additions & 1 deletion cubed/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import networkx as nx
import numpy as np

from cubed import config
from cubed.runtime.create import create_executor
from cubed.runtime.types import Callback
from cubed.storage.backend import open_backend_array
Expand Down Expand Up @@ -57,10 +58,23 @@
except ImportError:
pass


MODAL_EXECUTORS = []

try:
MODAL_EXECUTORS.append(create_executor("modal"))
# only set global config below if modal can be imported
import modal # noqa: F401

# need to set global config for testing modal since these options
# are read at the top level of modal.py to create remote functions
config.set(
{
"spec.executor_options.cloud": "aws",
"spec.executor_options.region": "us-east-1",
}
)
executor_options = dict(enable_output=True)
MODAL_EXECUTORS.append(create_executor("modal", executor_options))
except ImportError:
pass

Expand Down
6 changes: 4 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ Note that `batch_size` is not currently supported for Lithops.
| Property | Default | Description |
|------------------------------|---------|-------------------------------------------------------------------------------------|
| `cloud` | `"aws"` | The cloud to run on. One of `"aws"` or `"gcp"`. |
| `region` | N/A | The cloud region to run in. This must be set to match the region of your cloud store to avoid data transfer fees. See Modal's [Region selection](https://modal.com/docs/guide/region-selection) page for possible values. |
| `retries` | 2 | The number of times to retry a task if it fails. |
| `timeout` | 180 | Tasks that take longer than the timeout will be automatically killed and retried. |
| `enable_output` | False | Print Modal output to stdout and stderr things for debuggging. |
| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. |
| `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. |
| `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. |

Currently the Modal executor in Cubed uses a hard-coded value of 2 for retries and 300 seconds for timeouts, neither of which can be changed through configuration.

## Debugging

You can use Donfig's `pprint` method if you want to check which configuration settings are in effect when you code is run:
Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ When it comes to scaling out, there are a number of executors that work in the c
[**Lithops**](https://lithops-cloud.github.io/) is the executor we recommend for most users, since it has had the most testing so far (~1000 workers).
If your data is in Amazon S3 then use Lithops with AWS Lambda, and if it's in GCS use Lithops with Google Cloud Functions. You have to build a runtime environment as a part of the setting up process.

[**Modal**](https://modal.com/) is very easy to get started with because it handles building a runtime environment for you automatically (note that it requires that you [sign up](https://modal.com/signup) for a free account). **At the time of writing, Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.**
[**Modal**](https://modal.com/) is very easy to get started with because it automatically builds a runtime environment in a matter of seconds (note that it requires that you [sign up](https://modal.com/signup) for a free account). It has been tested with ~100 workers.

[**Coiled**](https://www.coiled.io/) is also easy to get started with ([sign up](https://cloud.coiled.io/signup)). It uses [Coiled Functions](https://docs.coiled.io/user_guide/usage/functions/index.html) and has a 1-2 minute overhead to start a cluster.

Expand Down
Loading

0 comments on commit 60a5562

Please sign in to comment.