diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index 41ecb242..2a03ceb4 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os from asyncio.exceptions import TimeoutError from typing import Any, Callable, Optional, Sequence @@ -8,60 +9,87 @@ from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt +from cubed import config from cubed.runtime.asyncio import async_map_dag from cubed.runtime.backup import use_backups_default from cubed.runtime.types import Callback, DagExecutor from cubed.runtime.utils import asyncio_run, execute_with_stats -from cubed.spec import Spec - -RUNTIME_MEMORY_MIB = 2000 +from cubed.spec import Spec, spec_from_config +from cubed.utils import convert_to_bytes app = modal.App("cubed-app", include_source=True) -requirements_file = os.getenv("CUBED_MODAL_REQUIREMENTS_FILE") +# Get Modal App settings from Cubed configuration. Note that we have to do this +# globally, since Modal remote functions have to be defined globally. +if modal.is_local(): + spec = spec_from_config(config) + executor_options = spec.executor_options +else: + executor_options = {} + +runtime_memory = convert_to_bytes(executor_options.get("memory", "2GB")) +runtime_memory_mib = runtime_memory // 1_000_000 +retries = executor_options.get("retries", 2) +timeout = executor_options.get("timeout", 180) +cloud = executor_options.get("cloud", "aws") +region = executor_options.get("region", None) +if modal.is_local() and region is None: + raise ValueError( + "Must set region when running using Modal, via the Cubed 'spec.executor_options.region' configuration setting." + ) + +requirements_file = executor_options.get("requirements_file", None) if requirements_file: image = modal.Image.debian_slim().pip_install_from_requirements(requirements_file) - aws_image = image - gcp_image = image else: - aws_image = modal.Image.debian_slim().pip_install( - [ - "array-api-compat", - "donfig", - "fsspec", - "mypy_extensions", # for rechunker - "ndindex", - "networkx", - "psutil", - "pytest-mock", # TODO: only needed for tests - "s3fs", - "tenacity", - "toolz", - "zarr", - ] - ) - gcp_image = modal.Image.debian_slim().pip_install( - [ - "array-api-compat", - "donfig", - "fsspec", - "mypy_extensions", # for rechunker - "ndindex", - "networkx", - "psutil", - "pytest-mock", # TODO: only needed for tests - "gcsfs", - "tenacity", - "toolz", - "zarr", - ] - ) + if cloud == "aws": + image = modal.Image.debian_slim().pip_install( + [ + "array-api-compat", + "donfig", + "fsspec", + "mypy_extensions", # for rechunker + "ndindex", + "networkx", + "psutil", + "pytest-mock", # TODO: only needed for tests + "s3fs", + "tenacity", + "toolz", + "zarr", + ] + ) + elif cloud == "gcp": + image = modal.Image.debian_slim().pip_install( + [ + "array-api-compat", + "donfig", + "fsspec", + "mypy_extensions", # for rechunker + "ndindex", + "networkx", + "psutil", + "pytest-mock", # TODO: only needed for tests + "gcsfs", + "tenacity", + "toolz", + "zarr", + ] + ) + else: + raise ValueError(f"Unrecognized cloud: {cloud}") + +if cloud == "aws": + secrets = [modal.Secret.from_name("my-aws-secret")] +elif cloud == "gcp": + secrets = [modal.Secret.from_name("my-googlecloud-secret")] +else: + raise ValueError(f"Unrecognized cloud: {cloud}") def check_runtime_memory(spec): allowed_mem = spec.allowed_mem if spec is not None else None - runtime_memory = RUNTIME_MEMORY_MIB * 1024 * 1024 if allowed_mem is not None: if runtime_memory < allowed_mem: raise ValueError( @@ -69,36 +97,26 @@ def check_runtime_memory(spec): ) -@app.function( - image=aws_image, - secrets=[modal.Secret.from_name("my-aws-secret")], - memory=RUNTIME_MEMORY_MIB, - retries=2, - cloud="aws", -) -def run_remotely(input, func=None, config=None, name=None, compute_id=None): - print(f"running remotely on {input} in {os.getenv('MODAL_REGION')}") - # note we can't use the execution_stat decorator since it doesn't work with modal decorators - result, stats = execute_with_stats(func, input, config=config) - return result, stats - - -# For GCP we need to use a class so we can set up credentials by hooking into the container lifecycle +# We use a class so for GCP we can set up credentials by hooking into the container lifecycle @app.cls( - image=gcp_image, - secrets=[modal.Secret.from_name("my-googlecloud-secret")], - memory=RUNTIME_MEMORY_MIB, - retries=2, - cloud="gcp", + image=image, + secrets=secrets, + cpu=1.0, + memory=runtime_memory_mib, # modal memory is in MiB + retries=retries, + timeout=timeout, + cloud=cloud, + region=region, ) class Container: @modal.enter() def set_up_credentials(self): - json = os.environ["SERVICE_ACCOUNT_JSON"] - path = os.path.abspath("application_credentials.json") - with open(path, "w") as f: - f.write(json) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path + if os.getenv("MODAL_CLOUD_PROVIDER") == "CLOUD_PROVIDER_GCP": + json = os.environ["SERVICE_ACCOUNT_JSON"] + path = os.path.abspath("application_credentials.json") + with open(path, "w") as f: + f.write(json) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path @modal.method() def run_remotely(self, input, func=None, config=None, name=None, compute_id=None): @@ -137,6 +155,9 @@ def execute_dag( **kwargs, ) -> None: merged_kwargs = {**self.kwargs, **kwargs} + # remove executor options as they should already have been used in defining the remote functions + for executor_option in ("memory", "retries", "timeout", "cloud", "region"): + merged_kwargs.pop(executor_option, None) asyncio_run( self._async_execute_dag( dag, @@ -169,21 +190,19 @@ async def _async_execute_dag( if "use_backups" not in kwargs and use_backups_default(spec): kwargs["use_backups"] = True - cloud = cloud or "aws" - if cloud == "aws": - function = run_remotely - elif cloud == "gcp": - function = Container().run_remotely - else: - raise ValueError(f"Unrecognized cloud: {cloud}") - - async with app.run(): - create_futures_func = modal_create_futures_func(function) - await async_map_dag( - create_futures_func, - dag=dag, - callbacks=callbacks, - resume=resume, - compute_arrays_in_parallel=compute_arrays_in_parallel, - **kwargs, - ) + function = Container().run_remotely + + enable_output = kwargs.pop("enable_output", False) + cm = modal.enable_output() if enable_output else contextlib.nullcontext() + + with cm: + async with app.run(): + create_futures_func = modal_create_futures_func(function) + await async_map_dag( + create_futures_func, + dag=dag, + callbacks=callbacks, + resume=resume, + compute_arrays_in_parallel=compute_arrays_in_parallel, + **kwargs, + ) diff --git a/cubed/tests/runtime/test_modal.py b/cubed/tests/runtime/test_modal.py index 399802e5..34bb79ab 100644 --- a/cubed/tests/runtime/test_modal.py +++ b/cubed/tests/runtime/test_modal.py @@ -14,7 +14,7 @@ from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure tmp_path = "s3://cubed-unittest/map_unordered" - +region = "us-east-1" # S3 region for above bucket app = modal.App("cubed-test-app", include_source=True) @@ -40,13 +40,19 @@ secrets=[modal.Secret.from_name("my-aws-secret")], retries=2, timeout=10, + cloud="aws", + region=region, ) def deterministic_failure_modal(i, path=None, timing_map=None, *, name=None): return deterministic_failure(path, timing_map, i, name=name) @app.function( - image=image, secrets=[modal.Secret.from_name("my-aws-secret")], timeout=10 + image=image, + secrets=[modal.Secret.from_name("my-aws-secret")], + timeout=10, + cloud="aws", + region=region, ) def deterministic_failure_modal_no_retries(i, path=None, timing_map=None, *, name=None): return deterministic_failure(path, timing_map, i, name=name) @@ -57,6 +63,8 @@ def deterministic_failure_modal_no_retries(i, path=None, timing_map=None, *, nam secrets=[modal.Secret.from_name("my-aws-secret")], retries=2, timeout=300, + cloud="aws", + region=region, ) def deterministic_failure_modal_long_timeout( i, path=None, timing_map=None, *, name=None @@ -66,16 +74,17 @@ def deterministic_failure_modal_long_timeout( async def run_test(app_function, input, use_backups=False, batch_size=None, **kwargs): outputs = set() - async with app.run(): - create_futures_func = modal_create_futures_func(app_function) - async for output in async_map_unordered( - create_futures_func, - input, - use_backups=use_backups, - batch_size=batch_size, - **kwargs, - ): - outputs.add(output) + with modal.enable_output(): + async with app.run(): + create_futures_func = modal_create_futures_func(app_function) + async for output in async_map_unordered( + create_futures_func, + input, + use_backups=use_backups, + batch_size=batch_size, + **kwargs, + ): + outputs.add(output) return outputs diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 14569003..ed17df87 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -16,13 +16,7 @@ from cubed.core.ops import general_blockwise, merge_chunks, partial_reduce, tree_reduce from cubed.core.optimization import fuse_all_optimize_dag, multiple_inputs_optimize_dag from cubed.storage.backend import open_backend_array -from cubed.tests.utils import ( - ALL_EXECUTORS, - MAIN_EXECUTORS, - MODAL_EXECUTORS, - TaskCounter, - create_zarr, -) +from cubed.tests.utils import ALL_EXECUTORS, MAIN_EXECUTORS, TaskCounter, create_zarr @pytest.fixture() @@ -48,15 +42,6 @@ def any_executor(request): return request.param -@pytest.fixture( - scope="module", - params=MODAL_EXECUTORS, - ids=[executor.name for executor in MODAL_EXECUTORS], -) -def modal_executor(request): - return request.param - - def test_as_array_fails(spec): a = np.ones((1000, 1000)) with pytest.raises( diff --git a/cubed/tests/test_executor_features.py b/cubed/tests/test_executor_features.py index e29d486d..25e695f3 100644 --- a/cubed/tests/test_executor_features.py +++ b/cubed/tests/test_executor_features.py @@ -303,7 +303,7 @@ def test_check_runtime_memory_modal(spec, modal_executor): c = xp.add(a, b) with pytest.raises( ValueError, - match=r"Runtime memory \(2097152000\) is less than allowed_mem \(4000000000\)", + match=r"Runtime memory \(2000000000\) is less than allowed_mem \(4000000000\)", ): c.compute(executor=modal_executor) diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index a5a98f74..e96dcf8d 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -4,6 +4,7 @@ import networkx as nx import numpy as np +from cubed import config from cubed.runtime.create import create_executor from cubed.runtime.types import Callback from cubed.storage.backend import open_backend_array @@ -57,10 +58,23 @@ except ImportError: pass + MODAL_EXECUTORS = [] try: - MODAL_EXECUTORS.append(create_executor("modal")) + # only set global config below if modal can be imported + import modal # noqa: F401 + + # need to set global config for testing modal since these options + # are read at the top level of modal.py to create remote functions + config.set( + { + "spec.executor_options.cloud": "aws", + "spec.executor_options.region": "us-east-1", + } + ) + executor_options = dict(enable_output=True) + MODAL_EXECUTORS.append(create_executor("modal", executor_options)) except ImportError: pass diff --git a/docs/configuration.md b/docs/configuration.md index 5bb00d1e..cb9dc3cd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -211,12 +211,14 @@ Note that `batch_size` is not currently supported for Lithops. | Property | Default | Description | |------------------------------|---------|-------------------------------------------------------------------------------------| | `cloud` | `"aws"` | The cloud to run on. One of `"aws"` or `"gcp"`. | +| `region` | N/A | The cloud region to run in. This must be set to match the region of your cloud store to avoid data transfer fees. See Modal's [Region selection](https://modal.com/docs/guide/region-selection) page for possible values. | +| `retries` | 2 | The number of times to retry a task if it fails. | +| `timeout` | 180 | Tasks that take longer than the timeout will be automatically killed and retried. | +| `enable_output` | False | Print Modal output to stdout and stderr things for debuggging. | | `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. | | `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. | | `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. | -Currently the Modal executor in Cubed uses a hard-coded value of 2 for retries and 300 seconds for timeouts, neither of which can be changed through configuration. - ## Debugging You can use Donfig's `pprint` method if you want to check which configuration settings are in effect when you code is run: diff --git a/docs/user-guide/executors.md b/docs/user-guide/executors.md index a7d8a7de..ffdd170c 100644 --- a/docs/user-guide/executors.md +++ b/docs/user-guide/executors.md @@ -20,7 +20,7 @@ When it comes to scaling out, there are a number of executors that work in the c [**Lithops**](https://lithops-cloud.github.io/) is the executor we recommend for most users, since it has had the most testing so far (~1000 workers). If your data is in Amazon S3 then use Lithops with AWS Lambda, and if it's in GCS use Lithops with Google Cloud Functions. You have to build a runtime environment as a part of the setting up process. -[**Modal**](https://modal.com/) is very easy to get started with because it handles building a runtime environment for you automatically (note that it requires that you [sign up](https://modal.com/signup) for a free account). **At the time of writing, Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.** +[**Modal**](https://modal.com/) is very easy to get started with because it automatically builds a runtime environment in a matter of seconds (note that it requires that you [sign up](https://modal.com/signup) for a free account). It has been tested with ~100 workers. [**Coiled**](https://www.coiled.io/) is also easy to get started with ([sign up](https://cloud.coiled.io/signup)). It uses [Coiled Functions](https://docs.coiled.io/user_guide/usage/functions/index.html) and has a 1-2 minute overhead to start a cluster. diff --git a/examples/modal/aws/README.md b/examples/modal/aws/README.md index 3dd54818..916bccd2 100644 --- a/examples/modal/aws/README.md +++ b/examples/modal/aws/README.md @@ -1,7 +1,5 @@ # Examples running Cubed on Modal -**Warning: Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.** - ## Pre-requisites 1. A [Modal account](https://modal.com/) @@ -10,14 +8,15 @@ ## Set up 1. Add a new [Modal secret](https://modal.com/secrets), by following the AWS wizard. This will prompt you to fill in values for `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. Call the secret `my-aws-secret`. -2. Create a new S3 bucket (called `cubed--temp`, for example) in the `us-east-1` region. This will be used for intermediate data. -3. Install a Python environment by running the following from this directory: +2. Create a new S3 bucket (called `cubed--temp`, for example) in a [region supported by Modal](https://modal.com/docs/guide/region-selection). This will be used for intermediate data. +3. Edit the file `cubed.yaml` and change the `spec.executor_options.region` key to be the region that you created the S3 bucket in. +4. Install a Python environment by running the following from this directory: ```shell conda create --name cubed-modal-aws-examples -y python=3.11 conda activate cubed-modal-aws-examples pip install 'cubed[modal]' -export CUBED_MODAL_REQUIREMENTS_FILE=$(pwd)/requirements.txt +export CUBED_SPEC__EXECUTOR_OPTIONS__REQUIREMENTS_FILE=$(pwd)/requirements.txt ``` ## Examples diff --git a/examples/modal/aws/cubed.yaml b/examples/modal/aws/cubed.yaml index b93ad7c6..c92c2ec5 100644 --- a/examples/modal/aws/cubed.yaml +++ b/examples/modal/aws/cubed.yaml @@ -4,3 +4,4 @@ spec: executor_name: "modal" executor_options: cloud: "aws" + region: "eu-west-1" # change to match the work_dir bucket diff --git a/examples/modal/gcp/README.md b/examples/modal/gcp/README.md index 17bc42c6..226d7c8d 100644 --- a/examples/modal/gcp/README.md +++ b/examples/modal/gcp/README.md @@ -1,7 +1,5 @@ # Examples running Cubed on Modal -**Warning: Modal does not guarantee that functions run in any particular cloud region, so it is not currently recommended that you run large computations since excessive data transfer fees are likely.** - ## Pre-requisites 1. A [Modal account](https://modal.com/) @@ -10,14 +8,15 @@ ## Set up 1. Add a new [Modal secret](https://modal.com/secrets), by following the Google Cloud wizard. This will prompt you to fill in values for `SERVICE_ACCOUNT_JSON` (it has instructions on how to create it, make sure you add the "Storage Admin" role). Call the secret `my-googlecloud-secret`. -2. Create a new GCS bucket (called `cubed--temp`, for example) in the `us-east-1` region. This will be used for intermediate data. -3. Install a Python environment by running the following from this directory: +2. Create a new GCS bucket (called `cubed--temp`, for example) in a [region supported by Modal](https://modal.com/docs/guide/region-selection). This will be used for intermediate data. +3. Edit the file `cubed.yaml` and change the `spec.executor_options.region` key to be the region that you created the GCS bucket in. +4. Install a Python environment by running the following from this directory: ```shell conda create --name cubed-modal-gcp-examples -y python=3.11 conda activate cubed-modal-gcp-examples pip install 'cubed[modal-gcp]' -export CUBED_MODAL_REQUIREMENTS_FILE=$(pwd)/requirements.txt +export CUBED_SPEC__EXECUTOR_OPTIONS__REQUIREMENTS_FILE=$(pwd)/requirements.txt ``` ## Examples diff --git a/examples/modal/gcp/cubed.yaml b/examples/modal/gcp/cubed.yaml index 10d6260f..65e38b79 100644 --- a/examples/modal/gcp/cubed.yaml +++ b/examples/modal/gcp/cubed.yaml @@ -4,3 +4,4 @@ spec: executor_name: "modal" executor_options: cloud: "gcp" + region: "europe-west2" # change to match the work_dir bucket