From 60a5562439c394d3258761e4e89ea732e3d8ae67 Mon Sep 17 00:00:00 2001 From: Tom White Date: Wed, 19 Feb 2025 16:36:20 +0000 Subject: [PATCH] Always set region for Modal executor --- cubed/runtime/executors/modal.py | 183 ++++++++++++++------------ cubed/tests/runtime/test_modal.py | 33 +++-- cubed/tests/test_core.py | 17 +-- cubed/tests/test_executor_features.py | 2 +- cubed/tests/utils.py | 16 ++- docs/configuration.md | 6 +- docs/user-guide/executors.md | 2 +- examples/modal/aws/README.md | 9 +- examples/modal/aws/cubed.yaml | 1 + examples/modal/gcp/README.md | 9 +- examples/modal/gcp/cubed.yaml | 1 + 11 files changed, 154 insertions(+), 125 deletions(-) diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index 41ecb242a..2a03ceb48 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os from asyncio.exceptions import TimeoutError from typing import Any, Callable, Optional, Sequence @@ -8,60 +9,87 @@ from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt +from cubed import config from cubed.runtime.asyncio import async_map_dag from cubed.runtime.backup import use_backups_default from cubed.runtime.types import Callback, DagExecutor from cubed.runtime.utils import asyncio_run, execute_with_stats -from cubed.spec import Spec - -RUNTIME_MEMORY_MIB = 2000 +from cubed.spec import Spec, spec_from_config +from cubed.utils import convert_to_bytes app = modal.App("cubed-app", include_source=True) -requirements_file = os.getenv("CUBED_MODAL_REQUIREMENTS_FILE") +# Get Modal App settings from Cubed configuration. Note that we have to do this +# globally, since Modal remote functions have to be defined globally. +if modal.is_local(): + spec = spec_from_config(config) + executor_options = spec.executor_options +else: + executor_options = {} + +runtime_memory = convert_to_bytes(executor_options.get("memory", "2GB")) +runtime_memory_mib = runtime_memory // 1_000_000 +retries = executor_options.get("retries", 2) +timeout = executor_options.get("timeout", 180) +cloud = executor_options.get("cloud", "aws") +region = executor_options.get("region", None) +if modal.is_local() and region is None: + raise ValueError( + "Must set region when running using Modal, via the Cubed 'spec.executor_options.region' configuration setting." + ) + +requirements_file = executor_options.get("requirements_file", None) if requirements_file: image = modal.Image.debian_slim().pip_install_from_requirements(requirements_file) - aws_image = image - gcp_image = image else: - aws_image = modal.Image.debian_slim().pip_install( - [ - "array-api-compat", - "donfig", - "fsspec", - "mypy_extensions", # for rechunker - "ndindex", - "networkx", - "psutil", - "pytest-mock", # TODO: only needed for tests - "s3fs", - "tenacity", - "toolz", - "zarr", - ] - ) - gcp_image = modal.Image.debian_slim().pip_install( - [ - "array-api-compat", - "donfig", - "fsspec", - "mypy_extensions", # for rechunker - "ndindex", - "networkx", - "psutil", - "pytest-mock", # TODO: only needed for tests - "gcsfs", - "tenacity", - "toolz", - "zarr", - ] - ) + if cloud == "aws": + image = modal.Image.debian_slim().pip_install( + [ + "array-api-compat", + "donfig", + "fsspec", + "mypy_extensions", # for rechunker + "ndindex", + "networkx", + "psutil", + "pytest-mock", # TODO: only needed for tests + "s3fs", + "tenacity", + "toolz", + "zarr", + ] + ) + elif cloud == "gcp": + image = modal.Image.debian_slim().pip_install( + [ + "array-api-compat", + "donfig", + "fsspec", + "mypy_extensions", # for rechunker + "ndindex", + "networkx", + "psutil", + "pytest-mock", # TODO: only needed for tests + "gcsfs", + "tenacity", + "toolz", + "zarr", + ] + ) + else: + raise ValueError(f"Unrecognized cloud: {cloud}") + +if cloud == "aws": + secrets = [modal.Secret.from_name("my-aws-secret")] +elif cloud == "gcp": + secrets = [modal.Secret.from_name("my-googlecloud-secret")] +else: + raise ValueError(f"Unrecognized cloud: {cloud}") def check_runtime_memory(spec): allowed_mem = spec.allowed_mem if spec is not None else None - runtime_memory = RUNTIME_MEMORY_MIB * 1024 * 1024 if allowed_mem is not None: if runtime_memory < allowed_mem: raise ValueError( @@ -69,36 +97,26 @@ def check_runtime_memory(spec): ) -@app.function( - image=aws_image, - secrets=[modal.Secret.from_name("my-aws-secret")], - memory=RUNTIME_MEMORY_MIB, - retries=2, - cloud="aws", -) -def run_remotely(input, func=None, config=None, name=None, compute_id=None): - print(f"running remotely on {input} in {os.getenv('MODAL_REGION')}") - # note we can't use the execution_stat decorator since it doesn't work with modal decorators - result, stats = execute_with_stats(func, input, config=config) - return result, stats - - -# For GCP we need to use a class so we can set up credentials by hooking into the container lifecycle +# We use a class so for GCP we can set up credentials by hooking into the container lifecycle @app.cls( - image=gcp_image, - secrets=[modal.Secret.from_name("my-googlecloud-secret")], - memory=RUNTIME_MEMORY_MIB, - retries=2, - cloud="gcp", + image=image, + secrets=secrets, + cpu=1.0, + memory=runtime_memory_mib, # modal memory is in MiB + retries=retries, + timeout=timeout, + cloud=cloud, + region=region, ) class Container: @modal.enter() def set_up_credentials(self): - json = os.environ["SERVICE_ACCOUNT_JSON"] - path = os.path.abspath("application_credentials.json") - with open(path, "w") as f: - f.write(json) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path + if os.getenv("MODAL_CLOUD_PROVIDER") == "CLOUD_PROVIDER_GCP": + json = os.environ["SERVICE_ACCOUNT_JSON"] + path = os.path.abspath("application_credentials.json") + with open(path, "w") as f: + f.write(json) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path @modal.method() def run_remotely(self, input, func=None, config=None, name=None, compute_id=None): @@ -137,6 +155,9 @@ def execute_dag( **kwargs, ) -> None: merged_kwargs = {**self.kwargs, **kwargs} + # remove executor options as they should already have been used in defining the remote functions + for executor_option in ("memory", "retries", "timeout", "cloud", "region"): + merged_kwargs.pop(executor_option, None) asyncio_run( self._async_execute_dag( dag, @@ -169,21 +190,19 @@ async def _async_execute_dag( if "use_backups" not in kwargs and use_backups_default(spec): kwargs["use_backups"] = True - cloud = cloud or "aws" - if cloud == "aws": - function = run_remotely - elif cloud == "gcp": - function = Container().run_remotely - else: - raise ValueError(f"Unrecognized cloud: {cloud}") - - async with app.run(): - create_futures_func = modal_create_futures_func(function) - await async_map_dag( - create_futures_func, - dag=dag, - callbacks=callbacks, - resume=resume, - compute_arrays_in_parallel=compute_arrays_in_parallel, - **kwargs, - ) + function = Container().run_remotely + + enable_output = kwargs.pop("enable_output", False) + cm = modal.enable_output() if enable_output else contextlib.nullcontext() + + with cm: + async with app.run(): + create_futures_func = modal_create_futures_func(function) + await async_map_dag( + create_futures_func, + dag=dag, + callbacks=callbacks, + resume=resume, + compute_arrays_in_parallel=compute_arrays_in_parallel, + **kwargs, + ) diff --git a/cubed/tests/runtime/test_modal.py b/cubed/tests/runtime/test_modal.py index 399802e53..34bb79ab7 100644 --- a/cubed/tests/runtime/test_modal.py +++ b/cubed/tests/runtime/test_modal.py @@ -14,7 +14,7 @@ from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure tmp_path = "s3://cubed-unittest/map_unordered" - +region = "us-east-1" # S3 region for above bucket app = modal.App("cubed-test-app", include_source=True) @@ -40,13 +40,19 @@ secrets=[modal.Secret.from_name("my-aws-secret")], retries=2, timeout=10, + cloud="aws", + region=region, ) def deterministic_failure_modal(i, path=None, timing_map=None, *, name=None): return deterministic_failure(path, timing_map, i, name=name) @app.function( - image=image, secrets=[modal.Secret.from_name("my-aws-secret")], timeout=10 + image=image, + secrets=[modal.Secret.from_name("my-aws-secret")], + timeout=10, + cloud="aws", + region=region, ) def deterministic_failure_modal_no_retries(i, path=None, timing_map=None, *, name=None): return deterministic_failure(path, timing_map, i, name=name) @@ -57,6 +63,8 @@ def deterministic_failure_modal_no_retries(i, path=None, timing_map=None, *, nam secrets=[modal.Secret.from_name("my-aws-secret")], retries=2, timeout=300, + cloud="aws", + region=region, ) def deterministic_failure_modal_long_timeout( i, path=None, timing_map=None, *, name=None @@ -66,16 +74,17 @@ def deterministic_failure_modal_long_timeout( async def run_test(app_function, input, use_backups=False, batch_size=None, **kwargs): outputs = set() - async with app.run(): - create_futures_func = modal_create_futures_func(app_function) - async for output in async_map_unordered( - create_futures_func, - input, - use_backups=use_backups, - batch_size=batch_size, - **kwargs, - ): - outputs.add(output) + with modal.enable_output(): + async with app.run(): + create_futures_func = modal_create_futures_func(app_function) + async for output in async_map_unordered( + create_futures_func, + input, + use_backups=use_backups, + batch_size=batch_size, + **kwargs, + ): + outputs.add(output) return outputs diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 145690037..ed17df87e 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -16,13 +16,7 @@ from cubed.core.ops import general_blockwise, merge_chunks, partial_reduce, tree_reduce from cubed.core.optimization import fuse_all_optimize_dag, multiple_inputs_optimize_dag from cubed.storage.backend import open_backend_array -from cubed.tests.utils import ( - ALL_EXECUTORS, - MAIN_EXECUTORS, - MODAL_EXECUTORS, - TaskCounter, - create_zarr, -) +from cubed.tests.utils import ALL_EXECUTORS, MAIN_EXECUTORS, TaskCounter, create_zarr @pytest.fixture() @@ -48,15 +42,6 @@ def any_executor(request): return request.param -@pytest.fixture( - scope="module", - params=MODAL_EXECUTORS, - ids=[executor.name for executor in MODAL_EXECUTORS], -) -def modal_executor(request): - return request.param - - def test_as_array_fails(spec): a = np.ones((1000, 1000)) with pytest.raises( diff --git a/cubed/tests/test_executor_features.py b/cubed/tests/test_executor_features.py index e29d486db..25e695f3c 100644 --- a/cubed/tests/test_executor_features.py +++ b/cubed/tests/test_executor_features.py @@ -303,7 +303,7 @@ def test_check_runtime_memory_modal(spec, modal_executor): c = xp.add(a, b) with pytest.raises( ValueError, - match=r"Runtime memory \(2097152000\) is less than allowed_mem \(4000000000\)", + match=r"Runtime memory \(2000000000\) is less than allowed_mem \(4000000000\)", ): c.compute(executor=modal_executor) diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index a5a98f74a..e96dcf8d7 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -4,6 +4,7 @@ import networkx as nx import numpy as np +from cubed import config from cubed.runtime.create import create_executor from cubed.runtime.types import Callback from cubed.storage.backend import open_backend_array @@ -57,10 +58,23 @@ except ImportError: pass + MODAL_EXECUTORS = [] try: - MODAL_EXECUTORS.append(create_executor("modal")) + # only set global config below if modal can be imported + import modal # noqa: F401 + + # need to set global config for testing modal since these options + # are read at the top level of modal.py to create remote functions + config.set( + { + "spec.executor_options.cloud": "aws", + "spec.executor_options.region": "us-east-1", + } + ) + executor_options = dict(enable_output=True) + MODAL_EXECUTORS.append(create_executor("modal", executor_options)) except ImportError: pass diff --git a/docs/configuration.md b/docs/configuration.md index 5bb00d1e4..cb9dc3cde 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -211,12 +211,14 @@ Note that `batch_size` is not currently supported for Lithops. | Property | Default | Description | |------------------------------|---------|-------------------------------------------------------------------------------------| | `cloud` | `"aws"` | The cloud to run on. One of `"aws"` or `"gcp"`. | +| `region` | N/A | The cloud region to run in. This must be set to match the region of your cloud store to avoid data transfer fees. See Modal's [Region selection](https://modal.com/docs/guide/region-selection) page for possible values. | +| `retries` | 2 | The number of times to retry a task if it fails. | +| `timeout` | 180 | Tasks that take longer than the timeout will be automatically killed and retried. | +| `enable_output` | False | Print Modal output to stdout and stderr things for debuggging. | | `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. | | `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. | | `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. | -Currently the Modal executor in Cubed uses a hard-coded value of 2 for retries and 300 seconds for timeouts, neither of which can be changed through configuration. - ## Debugging You can use Donfig's `pprint` method if you want to check which configuration settings are in effect when you code is run: diff --git a/docs/user-guide/executors.md b/docs/user-guide/executors.md index a7d8a7de7..ffdd170c7 100644 --- a/docs/user-guide/executors.md +++ b/docs/user-guide/executors.md @@ -20,7 +20,7 @@ When it comes to scaling out, there are a number of executors that work in the c [**Lithops**](https://lithops-cloud.github.io/) is the executor we recommend for most users, since it has had the most testing so far (~1000 workers). If your data is in Amazon S3 then use Lithops with AWS Lambda, and if it's in GCS use Lithops with Google Cloud Functions. You have to build a runtime environment as a part of the setting up process. -[**Modal**](https://modal.com/) is very easy to get started with because it handles building a runtime environment for you automatically (note that it requires that you [sign up](https://modal.com/signup) for a free account). **At the time of writing, Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.** +[**Modal**](https://modal.com/) is very easy to get started with because it automatically builds a runtime environment in a matter of seconds (note that it requires that you [sign up](https://modal.com/signup) for a free account). It has been tested with ~100 workers. [**Coiled**](https://www.coiled.io/) is also easy to get started with ([sign up](https://cloud.coiled.io/signup)). It uses [Coiled Functions](https://docs.coiled.io/user_guide/usage/functions/index.html) and has a 1-2 minute overhead to start a cluster. diff --git a/examples/modal/aws/README.md b/examples/modal/aws/README.md index 3dd54818e..916bccd23 100644 --- a/examples/modal/aws/README.md +++ b/examples/modal/aws/README.md @@ -1,7 +1,5 @@ # Examples running Cubed on Modal -**Warning: Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.** - ## Pre-requisites 1. A [Modal account](https://modal.com/) @@ -10,14 +8,15 @@ ## Set up 1. Add a new [Modal secret](https://modal.com/secrets), by following the AWS wizard. This will prompt you to fill in values for `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. Call the secret `my-aws-secret`. -2. Create a new S3 bucket (called `cubed--temp`, for example) in the `us-east-1` region. This will be used for intermediate data. -3. Install a Python environment by running the following from this directory: +2. Create a new S3 bucket (called `cubed--temp`, for example) in a [region supported by Modal](https://modal.com/docs/guide/region-selection). This will be used for intermediate data. +3. Edit the file `cubed.yaml` and change the `spec.executor_options.region` key to be the region that you created the S3 bucket in. +4. Install a Python environment by running the following from this directory: ```shell conda create --name cubed-modal-aws-examples -y python=3.11 conda activate cubed-modal-aws-examples pip install 'cubed[modal]' -export CUBED_MODAL_REQUIREMENTS_FILE=$(pwd)/requirements.txt +export CUBED_SPEC__EXECUTOR_OPTIONS__REQUIREMENTS_FILE=$(pwd)/requirements.txt ``` ## Examples diff --git a/examples/modal/aws/cubed.yaml b/examples/modal/aws/cubed.yaml index b93ad7c6d..c92c2ec53 100644 --- a/examples/modal/aws/cubed.yaml +++ b/examples/modal/aws/cubed.yaml @@ -4,3 +4,4 @@ spec: executor_name: "modal" executor_options: cloud: "aws" + region: "eu-west-1" # change to match the work_dir bucket diff --git a/examples/modal/gcp/README.md b/examples/modal/gcp/README.md index 17bc42c6d..226d7c8d7 100644 --- a/examples/modal/gcp/README.md +++ b/examples/modal/gcp/README.md @@ -1,7 +1,5 @@ # Examples running Cubed on Modal -**Warning: Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.** - ## Pre-requisites 1. A [Modal account](https://modal.com/) @@ -10,14 +8,15 @@ ## Set up 1. Add a new [Modal secret](https://modal.com/secrets), by following the Google Cloud wizard. This will prompt you to fill in values for `SERVICE_ACCOUNT_JSON` (it has instructions on how to create it, make sure you add the "Storage Admin" role). Call the secret `my-googlecloud-secret`. -2. Create a new GCS bucket (called `cubed--temp`, for example) in the `us-east-1` region. This will be used for intermediate data. -3. Install a Python environment by running the following from this directory: +2. Create a new GCS bucket (called `cubed--temp`, for example) in a [region supported by Modal](https://modal.com/docs/guide/region-selection). This will be used for intermediate data. +3. Edit the file `cubed.yaml` and change the `spec.executor_options.region` key to be the region that you created the GCS bucket in. +4. Install a Python environment by running the following from this directory: ```shell conda create --name cubed-modal-gcp-examples -y python=3.11 conda activate cubed-modal-gcp-examples pip install 'cubed[modal-gcp]' -export CUBED_MODAL_REQUIREMENTS_FILE=$(pwd)/requirements.txt +export CUBED_SPEC__EXECUTOR_OPTIONS__REQUIREMENTS_FILE=$(pwd)/requirements.txt ``` ## Examples diff --git a/examples/modal/gcp/cubed.yaml b/examples/modal/gcp/cubed.yaml index 10d6260f1..65e38b795 100644 --- a/examples/modal/gcp/cubed.yaml +++ b/examples/modal/gcp/cubed.yaml @@ -4,3 +4,4 @@ spec: executor_name: "modal" executor_options: cloud: "gcp" + region: "europe-west2" # change to match the work_dir bucket