diff --git a/api-reference/datasets/accessing-collection.mdx b/api-reference/datasets/accessing-collection.mdx index 2ff35f6..a99ba6f 100644 --- a/api-reference/datasets/accessing-collection.mdx +++ b/api-reference/datasets/accessing-collection.mdx @@ -11,16 +11,10 @@ information for it. -```python Python (Sync) +```python Python collections = dataset.collection("My-collection") ``` -```python Python (Async) -collections = dataset.collection("My-collection") -# just creates a collection object, no network calls are made -# so no await required -``` - ## Parameters diff --git a/api-reference/datasets/accessing-dataset.mdx b/api-reference/datasets/accessing-dataset.mdx index a36ba37..0e6810b 100644 --- a/api-reference/datasets/accessing-dataset.mdx +++ b/api-reference/datasets/accessing-dataset.mdx @@ -8,12 +8,7 @@ Once you have listed all available datasets, you can access a specific dataset b -```python Python (Sync) -dataset = datasets.open_data.copernicus.sentinel1_sar -# or any other dataset available to you -``` - -```python Python (Async) +```python Python dataset = datasets.open_data.copernicus.sentinel1_sar # or any other dataset available to you ``` diff --git a/api-reference/datasets/collection-info.mdx b/api-reference/datasets/collection-info.mdx index d022668..242b20f 100644 --- a/api-reference/datasets/collection-info.mdx +++ b/api-reference/datasets/collection-info.mdx @@ -8,14 +8,10 @@ You can access information such as availability and number of available datapoin -```python Python (Sync) +```python Python info = collection.info() ``` -```python Python (Async) -info = await collection.info() -``` - ## Errors diff --git a/api-reference/datasets/listing-collection.mdx b/api-reference/datasets/listing-collection.mdx index f594c1f..af659d1 100644 --- a/api-reference/datasets/listing-collection.mdx +++ b/api-reference/datasets/listing-collection.mdx @@ -8,20 +8,13 @@ You can list all the collections available for a dataset using the `collections` -```python Python (Sync) +```python Python collections = dataset.collections( availability = True, count = False, ) ``` -```python Python (Async) -collections = await dataset.collections( - availability = True, - count = False, -) -``` - ## Parameters diff --git a/api-reference/datasets/listing-datasets.mdx b/api-reference/datasets/listing-datasets.mdx index efd736d..02054da 100644 --- a/api-reference/datasets/listing-datasets.mdx +++ b/api-reference/datasets/listing-datasets.mdx @@ -8,18 +8,11 @@ All available datasets can be listed using the datasets method on your Tilebox d -```python Python (Sync) +```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() ``` -```python Python (Async) -from tilebox.datasets.aio import Client - -client = Client() -datasets = await client.datasets() -``` - diff --git a/api-reference/datasets/loading-data.mdx b/api-reference/datasets/loading-data.mdx index 64abc21..3cfb60c 100644 --- a/api-reference/datasets/loading-data.mdx +++ b/api-reference/datasets/loading-data.mdx @@ -14,7 +14,7 @@ Tilebox as time. Currently this includes either strings in ISO 8601 format or py -```python Python (Sync) +```python Python from datetime import datetime from tilebox.clients.core.data import TimeInterval @@ -40,32 +40,6 @@ meta_data = collection.load(..., skip_data=True) first_50 = collection.load(meta_data.time[:50], skip_data=False) ``` -```python Python (Async) -from datetime import datetime -from tilebox.clients.core.data import TimeInterval - -# loading a specific time -time = "2023-05-01 12:45:33.423" -data = await collection.load(time) - -# loading a time interval -interval = ("2023-05-01", "2023-08-01") -data = await collection.load(interval, show_progress=True) - -# loading a time interval alternative equivalent to the above example -interval = TimeInterval( - start = datetime(2023, 5, 1), - end = datetime(2023, 8, 1), - start_exclusive = False, - end_inclusive = False, -) -data = await collection.load(interval, show_progress=True) - -# loading with an iterable -meta_data = await collection.load(..., skip_data=True) -first_50 = await collection.load(meta_data.time[:50], skip_data=False) -``` - ## Parameters diff --git a/api-reference/datasets/loading-datapoint.mdx b/api-reference/datasets/loading-datapoint.mdx index f64a2a3..1716b5f 100644 --- a/api-reference/datasets/loading-datapoint.mdx +++ b/api-reference/datasets/loading-datapoint.mdx @@ -8,7 +8,7 @@ To load a single data point from a collection using its id, use the find method -```python Python (Sync) +```python Python datapoint_id = "0186d6b6-66cc-fcfd-91df-bbbff72499c3" data = collection.find( datapoint_id, @@ -16,14 +16,6 @@ data = collection.find( ) ``` -```python Python (Async) -datapoint_id = "0186d6b6-66cc-fcfd-91df-bbbff72499c3" -data = await collection.find( - datapoint_id, - skip_data = False, -) -``` - ## Parameters diff --git a/api-reference/storage-providers/creating-storage-client.mdx b/api-reference/storage-providers/creating-storage-client.mdx index bf35f3d..dd5b6d7 100644 --- a/api-reference/storage-providers/creating-storage-client.mdx +++ b/api-reference/storage-providers/creating-storage-client.mdx @@ -10,7 +10,7 @@ For a complete example look at the [Accessing Open Data](/datasets/open-data#sam -```python Python (Sync) +```python Python from pathlib import Path from tilebox.storage import ASFStorageClient # or UmbraStorageClient @@ -22,18 +22,6 @@ storage_client = ASFStorageClient( ) ``` -```python Python (Async) -from pathlib import Path -from tilebox.storage.aio import ASFStorageClient -# or UmbraStorageClient -# or CopernicusStorageClient - -storage_client = ASFStorageClient( - "ASF_USERNAME", "ASF_PASSWORD", - cache_directory=Path("./data") -) -``` - ## Parameters diff --git a/api-reference/storage-providers/deleting-cache.mdx b/api-reference/storage-providers/deleting-cache.mdx index 31fadbd..e18c558 100644 --- a/api-reference/storage-providers/deleting-cache.mdx +++ b/api-reference/storage-providers/deleting-cache.mdx @@ -8,14 +8,9 @@ To delete the entire download cache you can use the `destroy_cache` method. -```python Python (Sync) +```python Python # careful, this will delete the entire cache directory storage_client.destroy_cache() ``` -```python Python (Async) -# careful, this will delete the entire cache directory -await storage_client.destroy_cache() -``` - diff --git a/api-reference/storage-providers/deleting-products.mdx b/api-reference/storage-providers/deleting-products.mdx index 9cd3d54..36670f3 100644 --- a/api-reference/storage-providers/deleting-products.mdx +++ b/api-reference/storage-providers/deleting-products.mdx @@ -8,16 +8,11 @@ To delete downloaded products or images again you can use the `delete` method. -```python Python (Sync) +```python Python storage_client.delete(path_to_data) storage_client.delete(path_to_image) ``` -```python Python (Async) -await storage_client.delete(path_to_data) -await storage_client.delete(path_to_image) -``` - ## Parameters diff --git a/api-reference/storage-providers/direct-storage-access.mdx b/api-reference/storage-providers/direct-storage-access.mdx index 300d7db..c907efe 100644 --- a/api-reference/storage-providers/direct-storage-access.mdx +++ b/api-reference/storage-providers/direct-storage-access.mdx @@ -9,7 +9,7 @@ It does not cache any files and expects an `output_dir` parameter for all downlo -```python Python (Sync) +```python Python from pathlib import Path from tilebox.storage import ASFStorageClient # or UmbraStorageClient @@ -28,23 +28,4 @@ path_to_data = direct_storage_client.download( ) ``` -```python Python (Async) -from pathlib import Path -from tilebox.storage.aio import ASFStorageClient -# or UmbraStorageClient -# or CopernicusStorageClient - -direct_storage_client = ASFStorageClient( - "ASF_USERNAME", "ASF_PASSWORD", - cache_directory=None -) -path_to_data = await direct_storage_client.download( - datapoint, - output_dir=Path("./data"), - verify=True, - extract=True, - show_progress=True, -) -``` - diff --git a/api-reference/storage-providers/downloading-products.mdx b/api-reference/storage-providers/downloading-products.mdx index 3102cdd..d7cdf6d 100644 --- a/api-reference/storage-providers/downloading-products.mdx +++ b/api-reference/storage-providers/downloading-products.mdx @@ -8,7 +8,7 @@ You can download the product file for a given data point using the download meth -```python Python (Sync) +```python Python path_to_data = storage_client.download( datapoint, verify=True, @@ -17,15 +17,6 @@ path_to_data = storage_client.download( ) ``` -```python Python (Async) -path_to_data = await storage_client.download( - datapoint, - verify=True, - extract=True, - show_progress=True, -) -``` - ## Parameters diff --git a/api-reference/storage-providers/downloading-quicklook-images.mdx b/api-reference/storage-providers/downloading-quicklook-images.mdx index 801e319..b3849ed 100644 --- a/api-reference/storage-providers/downloading-quicklook-images.mdx +++ b/api-reference/storage-providers/downloading-quicklook-images.mdx @@ -8,18 +8,12 @@ In case a storage provider offers quicklook images for products you can download -```python Python (Sync) +```python Python path_to_image = storage_client.download_quicklook( datapoint ) ``` -```python Python (Async) -path_to_image = await storage_client.download_quicklook( - datapoint -) -``` - ## Parameters diff --git a/api-reference/storage-providers/previewing-quicklook-images.mdx b/api-reference/storage-providers/previewing-quicklook-images.mdx index fa2db1a..d4bc0e3 100644 --- a/api-reference/storage-providers/previewing-quicklook-images.mdx +++ b/api-reference/storage-providers/previewing-quicklook-images.mdx @@ -8,20 +8,13 @@ In interactive environments you can also display quicklook images directly in th -```python Python (Sync) +```python Python image = storage_client.quicklook( datapoint ) image # display the image as the cell output ``` -```python Python (Async) -image = await storage_client.quicklook( - datapoint -) -image # display the image as the cell output -``` - ## Parameters diff --git a/api-reference/workflows/cache-access.mdx b/api-reference/workflows/cache-access.mdx index 8f4c5d3..c0e7aa7 100644 --- a/api-reference/workflows/cache-access.mdx +++ b/api-reference/workflows/cache-access.mdx @@ -10,8 +10,7 @@ Make sure to specify dependencies between tasks to ensure that certain cache key been written to. - -```python Python (Sync) +```python Python class WriterTask(Task): def execute(self, context: ExecutionContext): context.job_cache["some-key"] = b"my-value" @@ -20,15 +19,4 @@ class ReaderTask(Task): def execute(self, context: ExecutionContext): data = context.job_cache["some-key"] ``` - -```python Python (Async) -class WriterTask(Task): - def execute(self, context: ExecutionContext): - context.job_cache["some-key"] = b"my-value" - -class ReaderTask(Task): - def execute(self, context: ExecutionContext): - data = context.job_cache["some-key"] -``` - diff --git a/api-reference/workflows/cancelling-job.mdx b/api-reference/workflows/cancelling-job.mdx index 9f98f2d..9385fb9 100644 --- a/api-reference/workflows/cancelling-job.mdx +++ b/api-reference/workflows/cancelling-job.mdx @@ -10,14 +10,10 @@ If after cancelling a job you want to resume it, you can [retry](/api-reference/ -```python Python (Sync) +```python Python job_client.cancel(job) ``` -```python Python (Async) -await job_client.cancel(job) -``` - ## Parameters diff --git a/api-reference/workflows/cluster-management.mdx b/api-reference/workflows/cluster-management.mdx index 82cb3af..196c334 100644 --- a/api-reference/workflows/cluster-management.mdx +++ b/api-reference/workflows/cluster-management.mdx @@ -8,11 +8,11 @@ You can use an instance of the `ClusterClient` to find, list, create, and delete -```python Python (Sync) +```python Python from tilebox.workflows import Client client = Client() -cluster_client = client.clusters() +cluster_client = client.clusters() # Find, List, Create and Delete clusters cluster = cluster_client.find("my-cluster-EdsdUozYprBJDL") # cluster-slug @@ -22,18 +22,4 @@ cluster = cluster_client.create("My Cluster") cluster_client.delete("my-cluster-EdsdUozYprBJDL") ``` -```python Python (Async) -from tilebox.workflows import Client - -client = Client() -cluster_client = await client.clusters() - -# Find, List, Create and Delete clusters -cluster = await cluster_client.find("my-cluster-EdsdUozYprBJDL") # cluster-slug -all_clusters = await cluster_client.all() -# will generate a new cluster slug from the provided name -cluster = await cluster_client.create("My Cluster") -await cluster_client.delete("my-cluster-EdsdUozYprBJDL") -``` - diff --git a/api-reference/workflows/creating-task-runner.mdx b/api-reference/workflows/creating-task-runner.mdx index 2eb6fe6..12eca19 100644 --- a/api-reference/workflows/creating-task-runner.mdx +++ b/api-reference/workflows/creating-task-runner.mdx @@ -8,7 +8,7 @@ From a client instance you can create a [TaskRunner](/workflows/concepts/task-ru -```python Python (Sync) +```python Python from tilebox.workflows import Client from tilebox.workflows.cache import LocalFileSystemCache @@ -20,18 +20,6 @@ runner = client.runner( ) ``` -```python Python (Async) -from tilebox.workflows.aio import Client -from tilebox.workflows.cache import LocalFileSystemCache - -client = Client() -runner = await client.runner( - "my-cluster-EdsdUozYprBJDL", - [MyFirstTask, MySubtask], - cache=LocalFileSystemCache("cache_directory"), # optional -) -``` - ## Parameters diff --git a/api-reference/workflows/implementing-task.mdx b/api-reference/workflows/implementing-task.mdx index c22c59d..9296fe7 100644 --- a/api-reference/workflows/implementing-task.mdx +++ b/api-reference/workflows/implementing-task.mdx @@ -8,15 +8,7 @@ A [task](/workflows/concepts/tasks) can be implemented by subclassing the Task c -```python Python (Sync) -from tilebox.workflows import Task, ExecutionContext - -class MyFirstTask(Task): - def execute(self, context: ExecutionContext): - print(f"Hello World!") -``` - -```python Python (Async) +```python Python from tilebox.workflows import Task, ExecutionContext class MyFirstTask(Task): diff --git a/api-reference/workflows/retrying-job.mdx b/api-reference/workflows/retrying-job.mdx index dd37173..610dc83 100644 --- a/api-reference/workflows/retrying-job.mdx +++ b/api-reference/workflows/retrying-job.mdx @@ -8,20 +8,13 @@ If a job fails, you can retry it by calling the `retry` method of the `JobClient -```python Python (Sync) +```python Python job = job_client.submit(...) # see above # job is now executing on some task runners, but fails # so let's retry it job_client.retry(job) ``` -```python Python (Async) -job = await job_client.submit(...) # see above -# job is now executing on some task runners, but fails -# so let's retry it -await job_client.retry(job) -``` - ## Parameters diff --git a/api-reference/workflows/starting-task-runner.mdx b/api-reference/workflows/starting-task-runner.mdx index 69495ed..7603557 100644 --- a/api-reference/workflows/starting-task-runner.mdx +++ b/api-reference/workflows/starting-task-runner.mdx @@ -10,12 +10,8 @@ runner idle and wait until work is available. -```python Python (Sync) +```python Python runner.run_forever() ``` -```python Python (Async) -await runner.run_forever() -``` - diff --git a/api-reference/workflows/submitting-job.mdx b/api-reference/workflows/submitting-job.mdx index d834bf4..9780e11 100644 --- a/api-reference/workflows/submitting-job.mdx +++ b/api-reference/workflows/submitting-job.mdx @@ -9,7 +9,7 @@ This can be done by calling the `submit` method of a `JobClient` instance. -```python Python (Sync) +```python Python from tilebox.workflows import Client client = Client() @@ -22,19 +22,6 @@ job = job_client.submit( ) ``` -```python Python (Async) -from tilebox.workflows.aio import Client - -client = Client() -job_client = await client.jobs() -job = await job_client.submit( - "my-first-job", - MyFirstTask(message="Hello, World!", number=42, data={"key": "value"}), - "my-cluster-EdsdUozYprBJDL", - max_retries=0, -) -``` - ## Parameters diff --git a/api-reference/workflows/subtasks.mdx b/api-reference/workflows/subtasks.mdx index 0f5f882..3831aba 100644 --- a/api-reference/workflows/subtasks.mdx +++ b/api-reference/workflows/subtasks.mdx @@ -12,23 +12,7 @@ dependencies between tasks. -```python Python (Sync) -# within the execute method of a Task: -subtask = context.submit_subtask(MySubtask()) -dependent_subtask = context.submit_subtask( - MyOtherSubtask(), depends_on=[subtask] -) -gpu_task = context.submit_subtask( - MyGPUTask(), - cluster="gpu-cluster-slug" -) -flaky_task = context.submit_subtask( - MyFlakyTask(), - max_retries=5 -) -``` - -```python Python (Async) +```python Python # within the execute method of a Task: subtask = context.submit_subtask(MySubtask()) dependent_subtask = context.submit_subtask( @@ -63,5 +47,5 @@ flaky_task = context.submit_subtask( Specify the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case of failure. Defaults - to 0. +to 0. diff --git a/api-reference/workflows/task-display-name.mdx b/api-reference/workflows/task-display-name.mdx index 92ab6dd..f4e5403 100644 --- a/api-reference/workflows/task-display-name.mdx +++ b/api-reference/workflows/task-display-name.mdx @@ -9,12 +9,7 @@ This allows you to change the display name of the task in [visualizations](/work -```python Python (Sync) -# within the execute method of a Task: -context.current_task.display = "My Custom Task Name" -``` - -```python Python (Async) +```python Python # within the execute method of a Task: context.current_task.display = "My Custom Task Name" ``` diff --git a/api-reference/workflows/task-inputs.mdx b/api-reference/workflows/task-inputs.mdx index 8f05ab9..67e79ba 100644 --- a/api-reference/workflows/task-inputs.mdx +++ b/api-reference/workflows/task-inputs.mdx @@ -9,19 +9,7 @@ These parameters are defined as class attributes, the same way as you would defi -```python Python (Sync) -from tilebox.workflows import Task, ExecutionContext - -class ParametrizableTask(Task): - message: str - number: int - data: dict[str, str] - - def execute(self, context: ExecutionContext): - ... -``` - -```python Python (Async) +```python Python from tilebox.workflows import Task, ExecutionContext class ParametrizableTask(Task): diff --git a/api-reference/workflows/visualizing-job.mdx b/api-reference/workflows/visualizing-job.mdx index b239233..93dbe40 100644 --- a/api-reference/workflows/visualizing-job.mdx +++ b/api-reference/workflows/visualizing-job.mdx @@ -13,20 +13,13 @@ Both methods have the same parameters. -```python Python (Sync) +```python Python svg: str = job_client.visualize(job) # or if you are in a Jupyter notebook: job_client.display(job) ``` -```python Python (Async) -svg: str = await job_client.visualize(job) - -# or if you are in a Jupyter notebook: -await job_client.display(job) -``` - ## Parameters @@ -37,7 +30,7 @@ await job_client.display(job) An explicit direction for the diagram to flow towards. See the relevant section in the [D2 - docs](https://d2lang.com/tour/layouts/#direction) for more detail. Valid values are `up`, `down`, `right`, `left`. +docs](https://d2lang.com/tour/layouts/#direction) for more detail. Valid values are `up`, `down`, `right`, `left`. Defaults to `down`. diff --git a/datasets/collections.mdx b/datasets/collections.mdx index 613b300..b718206 100644 --- a/datasets/collections.mdx +++ b/datasets/collections.mdx @@ -26,19 +26,13 @@ assume that you have already [created a client](/datasets/introduction#creating- - ```python Python (Sync) + ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() ``` - ```python Python (Async) - from tilebox.datasets.aio import Client - - client = Client() - datasets = await client.datasets() - ``` - + ## Listing collections @@ -48,18 +42,12 @@ Each dataset has a list of collections associated with it. You can list the coll - ```python Python (Sync) + ```python Python dataset = datasets.open_data.copernicus.landsat8_oli_tirs collections = dataset.collections() print(collections) ``` - ```python Python (Async) - dataset = datasets.open_data.copernicus.landsat8_oli_tirs - collections = await dataset.collections() - print(collections) - ``` - ```plaintext Output @@ -80,18 +68,12 @@ method. - ```python Python (Sync) + ```python Python dataset = datasets.open_data.copernicus.landsat8_oli_tirs collections = dataset.collections() print(collections) ``` - ```python Python (Async) - dataset = datasets.open_data.copernicus.landsat8_oli_tirs - collections = await dataset.collections() - print(collections) - ``` - ```plaintext Output @@ -110,20 +92,13 @@ You can then use the `info()` method on the collection object to get information - ```python Python (Sync) + ```python Python collections = dataset.collections() terrain_correction = collections["L1GT"] collection_info = terrain_correction.info() print(collection_info) ``` - ```python Python (Async) - collections = await dataset.collections() - terrain_correction = collections["L1GT"] - collection_info = await terrain_correction.info() - print(collection_info) - ``` - ```plaintext Output @@ -135,18 +110,12 @@ This has the advantage that you can directly access the collection without havin - ```python Python (Sync) + ```python Python terrain_correction = dataset.collection("L1GT") collection_info = terrain_correction.info() print(collection_info) ``` - ```python Python (Async) - terrain_correction = dataset.collection("L1GT") - collection_info = await terrain_correction.info() - print(collection_info) - ``` - ```plaintext Output @@ -161,14 +130,10 @@ If you try to access a collection with a name that does not exist, a `NotFoundEr -```python Python (Sync) +```python Python dataset.collection("Sat-X").info() # raises NotFoundError: 'No such collection Sat-X' ``` -```python Python (Async) -await dataset.collection("Sat-X").info() # raises NotFoundError: 'No such collection Sat-X' -``` - ## Next steps diff --git a/datasets/introduction.mdx b/datasets/introduction.mdx index 54eb303..3361845 100644 --- a/datasets/introduction.mdx +++ b/datasets/introduction.mdx @@ -60,16 +60,11 @@ With the prerequisites out of the way, you can now create a client instance to s - ```python Python (Sync) + ```python Python from tilebox.datasets import Client client = Client(token="YOUR_TILEBOX_API_KEY") ``` - ```python Python (Async) - from tilebox.datasets.aio import Client - - client = Client(token="YOUR_TILEBOX_API_KEY") - ``` @@ -78,18 +73,12 @@ without passing the `token` argument. Python automatically pick up the environme - ```python Python (Sync) + ```python Python from tilebox.datasets import Client # requires a TILEBOX_API_KEY environment variable client = Client() ``` - ```python Python (Async) - from tilebox.datasets.aio import Client - - # requires a TILEBOX_API_KEY environment variable - client = Client() - ``` @@ -107,16 +96,11 @@ of your IDE or inside your Jupyter notebook. -```python Python (Sync) +```python Python datasets = client.datasets() datasets. # trigger autocomplete here to get an overview of the available datasets ``` -```python Python (Async) -datasets = await client.datasets() -datasets. # trigger autocomplete here to get an overview of the available datasets -``` - ### Errors you might encounter @@ -129,16 +113,11 @@ directly, but only when you try to make a request to the API. -```python Python (Sync) +```python Python client = Client(token="invalid-key") # runs without error datasets = client.datasets() # raises AuthenticationError ``` -```python Python (Async) -client = Client(token="invalid-key") # runs without error -datasets = await client.datasets() # raises AuthenticationError -``` - ## Next steps diff --git a/datasets/loading-data.mdx b/datasets/loading-data.mdx index f61b262..f705423 100644 --- a/datasets/loading-data.mdx +++ b/datasets/loading-data.mdx @@ -21,7 +21,7 @@ assume that you have already [created a client](/datasets/introduction#creating- - ```python Python (Sync) + ```python Python from tilebox.datasets import Client client = Client() @@ -29,15 +29,7 @@ assume that you have already [created a client](/datasets/introduction#creating- collections = datasets.open_data.copernicus.sentinel1_sar.collections() collection = collections["S1A_IW_RAW__0S"] ``` - ```python Python (Async) - from tilebox.datasets.aio import Client - - client = Client() - datasets = await client.datasets() - collections = await datasets.open_data.copernicus.sentinel1_sar.collections() - collection = collections["S1A_IW_RAW__0S"] - ``` - + ## Loading data @@ -57,16 +49,11 @@ Check out the example below to see how to load a data point at a specific time f - ```python Python (Sync) + ```python Python data = collection.load("2024-08-01 00:00:01.362") print(data) ``` - ```python Python (Async) - data = await collection.load("2024-08-01 00:00:01.362") - print(data) - ``` - ```plaintext Output @@ -110,16 +97,11 @@ when calling `load`. Check out the example below to see this in action. -```python Python (Sync) +```python Python data = collection.load("2024-08-01 00:00:01.362", skip_data=True) print(data) ``` -```python Python (Async) -data = await collection.load("2024-08-01 00:00:01.362", skip_data=True) -print(data) -``` - ```plaintext Output @@ -140,18 +122,12 @@ In that case the returned dataset is empty, but it does not raise an error. -```python Python (Sync) +```python Python time_with_no_data_points = "1997-02-06 10:21:00" data = collection.load(time_with_no_data_points) print(data) ``` -```python Python (Async) -time_with_no_data_points = "1997-02-06 10:21:00" -data = await collection.load(time_with_no_data_points) -print(data) -``` - ```plaintext Output @@ -169,7 +145,7 @@ internally convert the specified datetime to `UTC` before making a request. The timestamps, which would need to be manually converted again to different timezones. - ```python Python (Sync) + ```python Python from datetime import datetime import pytz @@ -179,16 +155,6 @@ timestamps, which would need to be manually converted again to different timezon data = collection.load(tokyo_time) print(data) # time is in UTC since the API always returns UTC timestamps ``` - ```python Python (Async) - from datetime import datetime - import pytz - - # Tokyo has a UTC+9 hours offset, so this is the same as 2017-01-01 02:45:25.679 UTC - tokyo_time = pytz.timezone('Asia/Tokyo').localize(datetime(2017, 1, 1, 11, 45, 25, 679000)) - print(tokyo_time) - data = await collection.load(tokyo_time) - print(data) # time is in UTC since the API always returns UTC timestamps - ``` @@ -217,16 +183,11 @@ or as a string in ISO 8601 format. -```python Python (Sync) +```python Python interval = ("2017-01-01", "2023-01-01") data = collection.load(interval, show_progress=True) ``` -```python Python (Async) -interval = ("2017-01-01", "2023-01-01") -data = await collection.load(interval, show_progress=True) -``` - ```plaintext Output @@ -266,7 +227,7 @@ especially useful when chaining time intervals together. For example, the follow as preceding. - ```python Python (Sync) + ```python Python import xarray as xr data = [] @@ -278,19 +239,7 @@ as preceding. # to the result of the single request in the code example above. data = xr.concat(data, dim="time") ``` - ```python Python (Async) - import xarray as xr - - data = [] - for year in [2017, 2018, 2019, 2020, 2021, 2022] - interval = (f"{year}-01-01", f"{year + 1}-01-01") - data.append(await collection.load(interval, show_progress=True)) - - # Concatenate the data into a single dataset, which is equivalent - # to the result of the single request in the code example above. - data = xr.concat(data, dim="time") - ``` - + This code example shows a way to manually split up a large time interval into smaller chunks and make load data in @@ -307,7 +256,7 @@ of creating an equivalent `TimeInterval` object. - ```python Python (Sync) + ```python Python from datetime import datetime from tilebox.datasets.data import TimeInterval @@ -328,28 +277,7 @@ of creating an equivalent `TimeInterval` object. # same operation as above data = collection.load(interval1, show_progress=True) ``` - ```python Python (Async) - from datetime import datetime - from tilebox.data import TimeInterval - - interval1 = TimeInterval( - datetime(2017, 1, 1), datetime(2023, 1, 1), - end_inclusive=False - ) - interval2 = TimeInterval( - datetime(2017, 1, 1), datetime(2022, 12, 31, 23, 59, 59, 999999), - end_inclusive=True - ) - - print("Notice the different end characters ) and ] in the interval notations below:") - print(interval1) - print(interval2) - print(f"They are equivalent: {interval1 == interval2}") - - # same operation as above - data = await collection.load(interval1, show_progress=True) - ``` - + ```plaintext Output @@ -366,21 +294,14 @@ Another way of specifying a time interval when loading data is to use an iterabl `load` as the input for another call. Check out the example below to see how this can be done. - ```python Python (Sync) + ```python Python interval = ("2017-01-01", "2023-01-01") meta_data = collection.load(interval, skip_data=True) first_50_data_points = collection.load(meta_data.time[:50], skip_data=False) print(first_50_data_points) ``` - ```python Python (Async) - interval = ("2017-01-01", "2023-01-01") - meta_data = await collection.load(interval, skip_data=True) - - first_50_data_points = await collection.load(meta_data.time[:50], skip_data=False) - print(first_50_data_points) - ``` - + ```plaintext Output @@ -420,18 +341,12 @@ collection. Check out the example below to see how this can be done. -```python Python (Sync) +```python Python datapoint_id = "01916d89-ba23-64c9-e383-3152644bcbde" datapoint = collection.find(datapoint_id) print(datapoint) ``` -```python Python (Async) -datapoint_id = "01916d89-ba23-64c9-e383-3152644bcbde" -datapoint = await collection.find(datapoint_id) -print(datapoint) -``` - ```plaintext Output diff --git a/datasets/open-data.mdx b/datasets/open-data.mdx index 73e6c51..ddf16bf 100644 --- a/datasets/open-data.mdx +++ b/datasets/open-data.mdx @@ -9,7 +9,7 @@ These datasets are available to all users of Tilebox and are a great way to get even before data from your own satellites is available. - If there is an open data dataset you would like to see in Tilebox{" "} + If there is an open data dataset you would like to see in Tilebox please get in touch. @@ -46,7 +46,7 @@ Tilebox currently supports the following ASF datasets: You can query ASF metadata without any account, because Tilebox already indexed and ingested the relevant metadata. To access and download the actual satellite products, you need an ASF account. -You can create an account for ASF by choosing Sign In in the [ASF Vertex Search Tool](https://search.asf.alaska.edu/). +You can create an account for ASF in the [ASF Vertex Search Tool](https://search.asf.alaska.edu/). #### Further reading @@ -104,7 +104,7 @@ Here is a sample code snippets that shows how to access open data using the Tile -```python Code +```python Python from pathlib import Path from tilebox.datasets import Client @@ -156,7 +156,7 @@ Contents: -```python Code +```python Python from pathlib import Path from tilebox.datasets import Client @@ -210,7 +210,7 @@ Contents: -```python Code +```python Python from pathlib import Path from tilebox.datasets import Client diff --git a/datasets/timeseries.mdx b/datasets/timeseries.mdx index 880c55e..9aecb37 100644 --- a/datasets/timeseries.mdx +++ b/datasets/timeseries.mdx @@ -27,20 +27,13 @@ For example, to access a dataset called dataset in a dataset group called some, - ```python Python (Sync) + ```python Python from tilebox.datasets import Client client = Client() datasets = client.datasets() dataset = datasets.open_data.copernicus.sentinel1_sar ``` - ```python Python (Async) - from tilebox.datasets.aio import Client - - client = Client() - datasets = await client.datasets() - dataset = datasets.open_data.copernicus.sentinel1_sar - ``` diff --git a/sdks/python/xarray.mdx b/sdks/python/xarray.mdx index 524c483..ec159fd 100644 --- a/sdks/python/xarray.mdx +++ b/sdks/python/xarray.mdx @@ -54,7 +54,7 @@ To get an understanding of how Xarray works, a sample dataset is used, as it cou -```python Python (Sync) +```python Python from tilebox.datasets import Client client = Client() @@ -64,16 +64,6 @@ satellite_data = collection.load(("2022-05-01", "2022-06-01"), show_progress=Tru print(satellite_data) ``` -```python Python (Async) -from tilebox.datasets.aio import Client - -client = Client() -datasets = await client.datasets() -collection = datasets.open_data.copernicus.landsat8_oli_tirs.collection("L1GT") -satellite_data = await collection.load(("2022-05-01", "2022-06-01"), show_progress=True) -print(satellite_data) -``` - ```plaintext Output diff --git a/workflows/caches.mdx b/workflows/caches.mdx index 5442d96..48d0758 100644 --- a/workflows/caches.mdx +++ b/workflows/caches.mdx @@ -26,7 +26,7 @@ This is a cache implementation useful for local development and quick prototypin for more options. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import InMemoryCache @@ -37,18 +37,6 @@ for more options. cache=InMemoryCache(), ) ``` - ```python Python (Async) - from tilebox.workflows.aio import Client - from tilebox.workflows.cache import InMemoryCache - - client = Client() - runner = await client.runner( - "dev-cluster", - tasks=[...], - cache=InMemoryCache(), - ) - -```` By configuring such a cache, the `context` object that is passed to the execution of each task gains access to a `job_cache` @@ -77,7 +65,7 @@ The Tilebox Workflow orchestrator uses the official Python Client for Google clo to learn how to set up the required authentication. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import GoogleStorageCache from google.cloud.storage import Client as StorageClient @@ -92,21 +80,6 @@ to learn how to set up the required authentication. cache=GoogleStorageCache(bucket, prefix="jobs"), ) ``` - ```python Python (Async) - from tilebox.workflows.aio import Client - from tilebox.workflows.cache import GoogleStorageCache - from google.cloud.storage import Client as StorageClient - - storage_client = StorageClient(project="gcp-project") - bucket = storage_client.bucket("cache-bucket") - - client = Client() - runner = await client.runner( - "dev-cluster", - tasks=[...], - cache=GoogleStorageCache(bucket, prefix="jobs"), - ) - ``` @@ -125,7 +98,7 @@ The Tilebox Workflow orchestrator uses the official `boto3` library to interact to learn how to set up the required authentication. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import AmazonS3Cache @@ -136,17 +109,6 @@ to learn how to set up the required authentication. cache=AmazonS3Cache("my-bucket-name", prefix="jobs") ) ``` - ```python Python (Async) - from tilebox.workflows.aio import Client - from tilebox.workflows.cache import AmazonS3Cache - - client = Client() - runner = await client.runner( - "dev-cluster", - tasks=[...], - cache=AmazonS3Cache("my-bucket-name", prefix="jobs") - ) - ``` @@ -161,7 +123,7 @@ all task runners are running on the same machine or have access to the same file workflows executing on different task runners, but not recommended for production use. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import LocalFileSystemCache @@ -172,17 +134,6 @@ workflows executing on different task runners, but not recommended for productio cache=LocalFileSystemCache("/path/to/cache/directory"), ) ``` - ```python Python (Async) - from tilebox.workflows.aio import Client - from tilebox.workflows.cache import LocalFileSystemCache - - client = Client() - runner = await client.runner( - "dev-cluster", - tasks=[...], - cache=LocalFileSystemCache("/path/to/cache/directory"), - ) - ``` ### In-Memory Cache @@ -191,7 +142,7 @@ not shared between task runners, and not persisted between task runner restarts. prototyping of workflows executed on a single task runner. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client from tilebox.workflows.cache import InMemoryCache @@ -202,17 +153,6 @@ prototyping of workflows executed on a single task runner. cache=InMemoryCache(), ) ``` - ```python Python (Async) - from tilebox.workflows.aio import Client - from tilebox.workflows.cache import InMemoryCache - - client = Client() - runner = await client.runner( - "dev-cluster", - tasks=[...], - cache=InMemoryCache(), - ) - ``` ## Data Isolation @@ -239,7 +179,7 @@ any kind of data, such as pickled Python objects, serialized JSON, UTF-8 or ASCI The following code snippet shows how to store and retrieve data from the cache. - ```python Python (Sync) + ```python Python from tilebox.workflows import Task, ExecutionContext class ProducerTask(Task): @@ -253,20 +193,6 @@ The following code snippet shows how to store and retrieve data from the cache. data = context.job_cache["data"] print(f"Read {data} from cache") ``` - ```python Python (Async) - from tilebox.workflows.aio import Task, ExecutionContext - - class ProducerTask(Task): - async def execute(self, context: ExecutionContext) -> None: - # store data in the cache - context.job_cache["data"] = b"my_binary_data_to_store" - context.submit_subtask(ConsumerTask()) - - class ConsumerTask(Task): - async def execute(self, context: ExecutionContext) -> None: - data = context.job_cache["data"] - print(f"Read {data} from cache") - ``` The data passed to the `save` method is stored in the cache under the key `"data"`. It can be arbitrarily large binary @@ -277,7 +203,7 @@ To test the preceding workflow, it's possible to start a task runner locally, th to execute the `ProducerTask` and the output of the `ConsumerTask` can be observed. - ```python Python (Sync) + ```python Python # submit a job to test our workflow job_client = client.jobs() job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster") @@ -290,19 +216,6 @@ to execute the `ProducerTask` and the output of the `ConsumerTask` can be observ ) runner.run_forever() ``` - ```python Python (Async) - # submit a job to test our workflow - job_client = await client.jobs() - await job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster") - - # start a runner to execute it - runner = await client.runner( - "dev-cluster", - tasks=[ProducerTask, ConsumerTask], - cache=LocalFileSystemCache("/path/to/cache/directory"), - ) - await runner.run_forever() - ``` ```plaintext Output @@ -322,7 +235,7 @@ the keys `"1"`, `"2"` and `"3"`. Then all keys in the group are listed and the c then displays the sum of all data points. - ```python Python (Sync) + ```python Python from tilebox.workflows import Task, ExecutionContext import random @@ -367,58 +280,12 @@ then displays the sum of all data points. print(f"Sum of all numbers: {sum(numbers)}") ``` - ```python Python (Async) - from tilebox.workflows.aio import Task, ExecutionContext - import random - - class CacheGroupDemoWorkflow(Task): - n: int - - async def execute(self, context: ExecutionContext): - # we define a cache group key, which we pass on to the subtasks - group_key = "random_numbers" - # produce numbers will write numerous entries to the cache group - produce_numbers = context.submit_subtask(ProduceRandomNumbers(self.n, group_key)) - # print sum will then be able to list and read all entries from the cache group - sum_task = context.submit_subtask(PrintSum(group_key), depends_on=[produce_numbers]) - - class ProduceRandomNumbers(Task): - n: int - group_key: str - - async def execute(self, context: ExecutionContext): - for i in range(self.n): - context.submit_subtask(ProduceRandomNumber(i, self.group_key)) - - class ProduceRandomNumber(Task): - index: int - group_key: str - - async def execute(self, context: ExecutionContext) -> None: - number = random.randint(0, 100) - group = context.job_cache.group(self.group_key) - group[f"key_{self.index}"] = number.to_bytes() - - class PrintSum(Task): - group_key: str - - async def execute(self, context: ExecutionContext) -> None: - group = context.job_cache.group(self.group_key) - - numbers = [] - for key in group: # iterate over all produced numbers - number = group[key] # read data from cache - numbers.append(int.from_bytes(number)) # convert bytes back to int, and add to sum - - print(f"Sum of all numbers: {sum(numbers)}") - ``` - Submitting a job of the `CacheGroupDemoWorkflow`, and then running it with a task runner, could look like this: - ```python Python (Sync) + ```python Python # submit a job to test our workflow job_client = client.jobs() job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster") @@ -431,20 +298,6 @@ Submitting a job of the `CacheGroupDemoWorkflow`, and then running it with a tas ) runner.run_forever() ``` - ```python Python (Async) - # submit a job to test our workflow - job_client = await client.jobs() - await job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster") - - # start a runner to execute it - runner = await client.runner( - "dev-cluster", - tasks=[CacheGroupDemoWorkflow, ProduceRandomNumbers, ProduceRandomNumber, PrintSum], - cache=LocalFileSystemCache("/path/to/cache/directory"), - ) - await runner.run_forever() - ``` - ```plaintext Output diff --git a/workflows/concepts/clusters.mdx b/workflows/concepts/clusters.mdx index 10293a8..cbdf405 100644 --- a/workflows/concepts/clusters.mdx +++ b/workflows/concepts/clusters.mdx @@ -37,19 +37,12 @@ To manage clusters, you first need to instantiate a cluster client. You can do t workflows' client. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client client = Client() clusters = client.clusters() ``` - ```python Python (Async) - from tilebox.workflows.aio import Client - - client = Client() - clusters = await client.clusters() - ``` - ### Creating a Cluster @@ -57,15 +50,10 @@ workflows' client. To create a cluster, call the `create` method on the cluster client. You'll need to provide a name for the cluster. - ```python Python (Sync) + ```python Python cluster = clusters.create("testing") print(cluster) ``` - ```python Python (Async) - cluster = await clusters.create("testing") - print(cluster) - ``` - ```plaintext Output @@ -83,14 +71,10 @@ such as submitting a job or subtasks. To list all the clusters that are available to you, you can use `all`: - ```python Python (Sync) + ```python Python all_clusters = clusters.all() print(all_clusters) ``` - ```python Python (Async) - all_clusters = await clusters.all() - print(all_clusters) - ``` ```plaintext Output @@ -103,14 +87,10 @@ Cluster(slug='production-EifhUozDpwAJDL', display_name='Production')] To fetch a specific cluster, you can use the `fetch` method and pass in a cluster's slug: - ```python Python (Sync) + ```python Python cluster = clusters.find("testing-CvufcSxcC9SKfe") print(cluster) ``` - ```python Python (Async) - cluster = await clusters.find("testing-CvufcSxcC9SKfe") - print(cluster) - ``` ```plaintext Output @@ -122,12 +102,9 @@ Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing') To delete a cluster, you can use the `delete` method and pass in a cluster's slug: - ```python Python (Sync) + ```python Python clusters.delete("testing-CvufcSxcC9SKfe") ``` - ```python Python (Async) - await clusters.delete("testing-CvufcSxcC9SKfe") - ``` ## Jobs spanning different clusters @@ -137,15 +114,15 @@ This allows you to target a specific group of task runners for a particular job. submitted to the same cluster by default. This can also be manually overridden to submit sub-tasks to other clusters if needed. Check out the example below to see how this can be done. - - ```python Python (Sync) + + ```python Python from tilebox.workflows import Task, ExecutionContext, Client class MultiClusterWorkflow(Task): def execute(self, context: ExecutionContext) -> None: # this will submit a task to the same cluster as the one currently executing this task same_cluster = context.submit_subtask(DummyTask()) - # + other_cluster = context.submit_subtask( DummyTask(), # this task should only run on a task runner in the "other-cluster" cluster @@ -166,35 +143,6 @@ clusters if needed. Check out the example below to see how this can be done. cluster="testing-CvufcSxcC9SKfe", ) ``` - ```python Python (Async) - from tilebox.workflows.aio import Task, ExecutionContext, Client - - class MultiClusterWorkflow(Task): - async def execute(self, context: ExecutionContext) -> None: - # this will submit a task to the same cluster as the one currently executing this task - same_cluster = context.submit_subtask(DummyTask()) - - other_cluster = context.submit_subtask( - DummyTask(), - # this task should only run on a task runner in the "other-cluster" cluster - cluster="other-cluster-As3dcSb3D9SAdK", - # dependencies can also be specified across clusters - depends_on=[same_cluster], - ) - - class DummyTask(Task): - async def execute(self, context: ExecutionContext) -> None: - pass - - # submit a job to the "testing" cluster - client = Client() - job_client = await client.jobs() - job = await job_client.submit( - MultiClusterWorkflow(), - cluster="testing-CvufcSxcC9SKfe", - ) - ``` - This workflow requires at least two task runners to complete. One task runner must be in the "testing" cluster diff --git a/workflows/concepts/jobs.mdx b/workflows/concepts/jobs.mdx index d77dc3b..f3c88b2 100644 --- a/workflows/concepts/jobs.mdx +++ b/workflows/concepts/jobs.mdx @@ -23,18 +23,12 @@ For submitting a job, a job client needs to be instantiated first. This can be d workflows' client. -```python Python (Sync) +```python Python from tilebox.workflows import Client client = Client() job_client = client.jobs() ``` -```python Python (Async) -from tilebox.workflows.aio import Client - -client = Client() -job_client = await client.jobs() -``` Once you have a job client, you can submit a job by calling the `submit` method on it. This requires a name for the job, @@ -42,20 +36,13 @@ an instance of the root [task](/workflows/concepts/tasks), and a [cluster](/work to submit the job to. - ```python Python (Sync) + ```python Python # import your own workflow from my_workflow import MyTask cluster = "dev-cluster" job = job_client.submit('my-job', MyTask("some", "parameters"), cluster) ``` - ```python Python (Async) - # import your own workflow - from my_workflow import MyTask - - cluster = "dev-cluster" - job = await job_client.submit('my-job', MyTask("some", "parameters"), cluster) - ``` Now that a job has been submitted, it's immediately scheduled for execution. As soon as a [eligible task runner](/workflows/concepts/task-runners#task-selection) @@ -67,18 +54,12 @@ is available, the root task of the job is picked up and executed. the root task of a job, where the number of retries can be specified by using the `max_retries` argument of the `submit` method. - ```python Python (Sync) + ```python Python from my_workflow import MyFlakyTask cluster = "dev-cluster" job = job_client.submit('my-job', MyFlakyTask(), cluster, max_retries=5) ``` - ```python Python (Async) - from my_workflow import MyFlakyTask - - cluster = "dev-cluster" - job = await job_client.submit('my-job', MyFlakyTask(), cluster, max_retries=5) - ``` In this example, if `MyFlakyTask` fails, it gets retried up to 5 times before being eventually marked as failed. @@ -90,20 +71,13 @@ When a job is submitted, it gets assigned a unique identifier. This identifier c To retrieve a job by its identifier, the `find` method on the job client can be used. - ```python Python (Sync) + ```python Python job = job_client.submit('my-job', MyTask("some", "parameters"), cluster) print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a # later on, or in another process or on a completely different machine, job info can be retrieved job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a") ``` - ```python Python (Async) - job = await job_client.submit('my-job', MyTask("some", "parameters"), cluster) - print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a - - # later on, or in another process or on a completely different machine, job info can be retrieved - job = await job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a") - ``` ## Visualization @@ -121,16 +95,11 @@ Assuming you have submitted a job, you can use the `display` method on the job c - ```python Python (Sync) + ```python Python job = job_client.find("some-job-id") # or a job that was just submitted # and then visualize it job_client.display(job) ``` - ```python Python (Async) - job = await job_client.find("some-job-id") # or a job that was just submitted - # and then visualize it - await job_client.display(job) - ``` The following diagram displays the execution of a job as a graph. Each task is represented as a node in the graph, @@ -181,7 +150,7 @@ the display name is 1024 characters. Any string larger than that is truncated. T line breaks. - ```python Python (Sync) + ```python Python from tilebox.workflows import Task, ExecutionContext class RootTask(Task): @@ -201,26 +170,6 @@ line breaks. job = job_client.submit('custom-display-names', RootTask(3), "dev-cluster") job_client.display(job) ``` - ```python Python (Async) - from tilebox.workflows.aio import Task, ExecutionContext - - class RootTask(Task): - num_subtasks: int - - async def execute(self, context: ExecutionContext): - context.current_task.display = f"Root({self.num_subtasks})" - for i in range(self.num_subtasks): - context.submit_subtask(SubTask(i)) - - class SubTask(Task): - index: int - - async def execute(self, context: ExecutionContext): - context.current_task.display = f"Leaf Nr. {self.index}" - - job = await job_client.submit('custom-display-names', RootTask(3), "dev-cluster") - await job_client.display(job) - ``` @@ -238,16 +187,11 @@ to execute until it finishes. To cancel a job you can use the `cancel` method on the job client. - ```python Python (Sync) + ```python Python job = job_client.submit('my-job', MyTask(), "dev-cluster") # after a short while, the job gets canceled job_client.cancel(job) ``` - ```python Python (Async) - job = job_client.submit('my-job', MyTask(), "dev-cluster") - # after a short while, the job gets canceled - await job_client.cancel(job) - ``` @@ -279,7 +223,7 @@ The following workflow accepts a list of movie titles, and then queries the [OMD release date of each movie. - ```python Python (Sync) + ```python Python from urllib.parse import urlencode import httpx from tilebox.workflows import Task, ExecutionContext @@ -302,36 +246,12 @@ release date of each movie. context.current_task.display = response["Title"] print(f"{response['Title']} was released on {response['Released']}") ``` - ```python Python (Async) - from urllib.parse import urlencode - import httpx - from tilebox.workflows.aio import Task, ExecutionContext - - class MoviesStats(Task): - titles: list[str] - - async def execute(self, context: ExecutionContext) -> None: - for title in self.titles: - context.submit_subtask(PrintMovieStats(title)) - - class PrintMovieStats(Task): - title: str - - async def execute(self, context: ExecutionContext) -> None: - params = {"t": self.title, "apikey": ""} - url = "http://www.omdbapi.com/?" + urlencode(params) - async with httpx.AsyncClient() as client: - response = (await client.get(url)).json() - # set the display name of the task to the title of the movie: - context.current_task.display = response["Title"] - print(f"{response['Title']} was released on {response['Released']}") - ``` Submitting below job reveals a bug in the `PrintMovieStats` task. - ```python Python (Sync) + ```python Python job = job_client.submit('movies-stats', MoviesStats([ "The Matrix", "Shrek 2", @@ -341,16 +261,6 @@ Submitting below job reveals a bug in the `PrintMovieStats` task. job_client.display(job) ``` - ```python Python (Async) - job = await job_client.submit('movies-stats', MoviesStats([ - "The Matrix", - "Shrek 2", - "Tilebox - The Movie", - "The Avengers", - ]), "dev-cluster") - - await job_client.display(job) - ``` @@ -372,7 +282,7 @@ ERROR: Task PrintMovieStats failed with exception: KeyError('Title') A fixed version of the `PrintMovieStats` looks like this: - ```python Python (Sync) + ```python Python class PrintMovieStats(Task): title: str @@ -387,36 +297,16 @@ A fixed version of the `PrintMovieStats` looks like this: context.current_task.display = f"NotFound: {self.title}" print(f"Could not find the release date for {self.title}") ``` - ```python Python (Async) - class PrintMovieStats(Task): - title: str - - async def execute(self, context: ExecutionContext) -> None: - params = {"t": self.title, "apikey": ""} - url = "http://www.omdbapi.com/?" + urlencode(params) - async with httpx.AsyncClient() as client: - response = (await client.get(url)).json() - if "Title" in response and "Released" in response: - context.current_task.display = response["Title"] - print(f"{response['Title']} was released on {response['Released']}") - else: - context.current_task.display = f"NotFound: {self.title}" - print(f"Could not find the release date for {self.title}") - ``` Now with this fix in place, and the [task runners](/workflows/concepts/task-runners) redeployed with the updated implementation of the `PrintMovieStats` task, it's time to retry the job: - ```python Python (Sync) + ```python Python job_client.retry(job) job_client.display(job) ``` - ```python Python (Async) - await job_client.retry(job) - await job_client.display(job) - ``` diff --git a/workflows/concepts/task-runners.mdx b/workflows/concepts/task-runners.mdx index 5ea93f9..a3e8caf 100644 --- a/workflows/concepts/task-runners.mdx +++ b/workflows/concepts/task-runners.mdx @@ -41,7 +41,7 @@ Implementing a task runner consists of the following steps: A simple example of those steps put together might look like this: - ```python Python (Sync) + ```python Python from tilebox.workflows import Client # your own workflow: from my_workflow import MyTask, OtherTask @@ -58,25 +58,6 @@ A simple example of those steps put together might look like this: if __name__ == "__main__": main() ``` - ```python Python (Async) - import asyncio - from tilebox.workflows.aio import Client - # your own workflow: - from my_workflow import MyTask, OtherTask - - async def main(): - client = Client() # 1. connect to the Tilebox Workflows API - cluster = "dev-cluster" # 2. select a cluster to join - runner = await client.runner( - cluster, - tasks=[MyTask, OtherTask] # 3. register tasks - ) - await runner.run_forever() # 4. listen for new tasks to execute - - if __name__ == "__main__": - asyncio.run(main()) - ``` - To now start the task runner locally, simply run it as a Python script: @@ -169,7 +150,7 @@ with a large amount of memory. Below is an example of a distributed workflow: - ```python Python (Sync) + ```python Python from tilebox.workflows import Task, ExecutionContext class DistributedWorkflow(Task): @@ -194,32 +175,6 @@ Below is an example of a distributed workflow: def execute(self, context: ExecutionContext) -> None: pass ``` - ```python Python (Async) - from tilebox.workflows.aio import Task, ExecutionContext - - class DistributedWorkflow(Task): - async def execute(self, context: ExecutionContext) -> None: - download_task = context.submit_subtask(DownloadData()) - process_task = context.submit_subtask(ProcessData(), depends_on=[download_task]) - - class DownloadData(Task): - """ - Download a dataset and put it into a shared internal bucket - Requires a good network connection for a high download bandwidth - """ - async def execute(self, context: ExecutionContext) -> None: - pass - - class ProcessData(Task): - """ - Perform a compute intensive processing of a dataset - The dataset needs to be available in an internal bucket - Requires access to a GPU for optimal performance - """ - async def execute(self, context: ExecutionContext) -> None: - pass - ``` - To achieve a distributed execution of this workflow, no single task runner capable of executing all three of the tasks is set up. @@ -250,7 +205,6 @@ executed by either one of them. ) gpu_runner.run_forever() ``` - Now, both `download_task_runner.py` and `gpu_task_runner.py`are started, in parallel, on different machines with the required hardware for each. diff --git a/workflows/concepts/tasks.mdx b/workflows/concepts/tasks.mdx index eb6cfbb..681f19a 100644 --- a/workflows/concepts/tasks.mdx +++ b/workflows/concepts/tasks.mdx @@ -19,24 +19,16 @@ The `execute` method is the entry point for the task and is where the task's log is executed and performs the task's operation. - ```python Python (Sync) + ```python Python from tilebox.workflows import Task, ExecutionContext class MyFirstTask(Task): def execute(self, context: ExecutionContext): print(f"Hello World!") ``` - ```python Python (Async) - from tilebox.workflows.aio import Task, ExecutionContext - - class MyFirstTask(Task): - async def execute(self, context: ExecutionContext): - print(f"Hello World!") - ``` - -This example showcases a simple task that prints `"Hello World!"` to the console. The key components of this task are: +This example showcases a simple task that prints "Hello World!" to the console. The key components of this task are: @@ -80,7 +72,7 @@ Supported types for input parameters are: - Nested data classes that are again JSON-serializable - ```python Python (Sync) + ```python Python class ParametrizableTask(Task): message: str number: int @@ -91,18 +83,6 @@ Supported types for input parameters are: task = ParametrizableTask("Hello", 3, {"key": "value"}) ``` - ```python Python (Async) - class ParametrizableTask(Task): - message: str - number: int - data: dict[str, str] - - async def execute(self, context: ExecutionContext): - print(self.message * self.number) - - task = ParametrizableTask("Hello", 3, {"key": "value"}) - ``` - ## Task Composition and subtasks @@ -113,7 +93,7 @@ where complex operations are broken down into simpler, manageable pieces. And as parallelized whenever possible. -```python Python (Sync) +```python Python class ParentTask(Task): num_subtasks: int @@ -122,31 +102,13 @@ class ParentTask(Task): context.submit_subtask(ChildTask(i)) class ChildTask(Task): -index: int + index: int def execute(self, context: ExecutionContext) -> None: print(f"Executing ChildTask {self.index}") task = ParentTask(5) # submitting this task will result in 5 ChildTasks being executed as well - -```` -```python Python (Async) -class ParentTask(Task): - num_subtasks: int - - async def execute(self, context: ExecutionContext) -> None: - for i in range(self.num_subtasks): - context.submit_subtask(ChildTask(i)) - -class ChildTask(Task): - index: int - - async def execute(self, context: ExecutionContext) -> None: - print(f"Executing ChildTask {self.index}") - -task = ParentTask(5) # submitting this task will result in 5 ChildTasks being executed as well -```` - +``` In the preceding example, a `ParentTask` that submits `ChildTask` tasks as subtasks is defined. The number of subtasks to be submitted @@ -167,7 +129,7 @@ The workflow is capable of downloading a certain number of random dog images fro used to get the URLs of random dog images and then download them. Implementing a workflow such as this using Task Composition could look like this: - ```python Python (Sync) + ```python Python import httpx # pip install httpx from pathlib import Path @@ -189,31 +151,6 @@ used to get the URLs of random dog images and then download them. Implementing a with file.open("wb") as file: file.write(response.content) ``` - ```python Python (Async) - import httpx # pip install httpx - from pathlib import Path - - class DownloadRandomDogImages(Task): - num_images: int - - async def execute(self, context: ExecutionContext) -> None: - url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}" - async with httpx.AsyncClient() as client: - response = await client.get(url) - for dog_image in response.json()[:self.num_images]: - context.submit_subtask(DownloadImage(dog_image["url"])) - - class DownloadImage(Task): - url: str - - async def execute(self, context: ExecutionContext) -> None: - file = Path("dogs") / self.url.split("/")[-1] - async with httpx.AsyncClient() as client: - response = await client.get(self.url) - with file.open("wb") as file: - file.write(response.content) - ``` - This larger example consists of the following tasks: @@ -233,7 +170,7 @@ Together, these tasks form a workflow that downloads a certain number of random Visualizing the execution of a workflow that uses these tasks could look like a tree of tasks, where the `DownloadRandomDogImages` task is the root and the `DownloadImage` tasks are the leaves. For example, when downloading five random dog images, the following tasks are executed. - ```python Python (Sync) + ```python Python from tilebox.workflows import Client client = Client() @@ -244,20 +181,7 @@ Visualizing the execution of a workflow that uses these tasks could look like a "dev-cluster", ) jobs.display(job) - ```` - ```python Python (Async) - from tilebox.workflows.aio import Client - - client = Client() - jobs = await client.jobs() - job = await jobs.submit( - "download-dog-images", - DownloadRandomDogImages(5), - "dev-cluster", - ) - await jobs.display(job) - ```` - + ``` @@ -295,7 +219,7 @@ Fortunately, Tilebox Workflows offers a way to resolve this issue by using **rec As an example, the `RecursiveTask` below is a perfectly valid task that recursively submits smaller instances of itself as subtasks. - ```python Python (Sync) + ```python Python class RecursiveTask(Task): num: int @@ -304,16 +228,6 @@ As an example, the `RecursiveTask` below is a perfectly valid task that recursiv if self.num >= 2: context.submit_subtask(RecursiveTask(self.num // 2)) ``` - ```python Python (Async) - class RecursiveTask(Task): - num: int - - async def execute(self, context: ExecutionContext) -> None: - print(f"Executing RecursiveTask with num={self.num}") - if self.num >= 2: - context.submit_subtask(RecursiveTask(self.num // 2)) - ``` - ### Recursive subtask Example @@ -335,7 +249,7 @@ downloading as soon as the first image URLs are retrieved. An implementation of this recursive subtask submission could look like this: - ```python Python (Sync) + ```python Python class DownloadRandomDogImages(Task): num_images: int @@ -351,24 +265,6 @@ An implementation of this recursive subtask submission could look like this: for dog_image in response.json()[:self.num_images]: context.submit_subtask(DownloadImage(dog_image["url"])) ``` - ```python Python (Async) - class DownloadRandomDogImages(Task): - num_images: int - - async def execute(self, context: ExecutionContext) -> None: - if self.num_images > 4: - half = self.num_images // 2 - remaining = self.num_images - half # account for uneven numbers - context.submit_subtask(DownloadRandomDogImages(half)) - context.submit_subtask(DownloadRandomDogImages(remaining)) - else: - url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}" - async with httpx.AsyncClient() as client: - response = await client.get(url) - for dog_image in response.json()[:self.num_images]: - context.submit_subtask(DownloadImage(dog_image["url"])) - ``` - With this implementation, downloading a large number of images (for example 9 in the workflow below) results in the following tasks being executed: @@ -401,7 +297,7 @@ If the `FlakyTask` then fails, it gets retried for up to 5 times before eventual - ```python Python (Sync) + ```python Python import random class RootTask(Task): @@ -417,23 +313,6 @@ If the `FlakyTask` then fails, it gets retried for up to 5 times before eventual task = RootTask() ``` - ```python Python (Async) - import random - - class RootTask(Task): - async def execute(self, context: ExecutionContext) -> None: - context.submit_subtask(FlakyTask(), max_retries=5) - - class FlakyTask(Task): - async def execute(self, context: ExecutionContext) -> None: - print(f"Executing FlakyTask") - - if random.random() < 0.1: - raise Exception("FlakyTask failed randomly") - - task = RootTask() - ``` - ## Dependencies @@ -449,7 +328,7 @@ Up until now all tasks were independent of one another. But often, tasks have de A workflow that has tasks with dependencies on other tasks is implemented as follows: - ```python Python (Sync) + ```python Python class RootTask(Task): def execute(self, context: ExecutionContext) -> None: first_task = context.submit_subtask(PrintTask("Executing first")) @@ -462,20 +341,6 @@ A workflow that has tasks with dependencies on other tasks is implemented as fol def execute(self, context: ExecutionContext) -> None: print(self.message) ``` - ```python Python (Async) - class RootTask(Task): - async def execute(self, context: ExecutionContext) -> None: - first_task = context.submit_subtask(PrintTask("Executing first")) - second_task = context.submit_subtask(PrintTask("Executing second"), depends_on=[first_task]) - third_task = context.submit_subtask(PrintTask("Executing last"), depends_on=[second_task]) - - class PrintTask(Task): - message: str - - async def execute(self, context: ExecutionContext) -> None: - print(self.message) - ``` - The `RootTask` submits three `PrintTask` tasks as subtasks. Those tasks are dependent on one another, meaning that the @@ -492,7 +357,7 @@ This results in the tasks being executed sequentially. As a practical example, below is a workflow that fetches news articles from an API and then processes them using [News API](https://newsapi.org/). - ```python Python (Sync) + ```python Python from pathlib import Path import json from collections import Counter @@ -538,7 +403,6 @@ As a practical example, below is a workflow that fetches news articles from an A ) job_client.visualize(job, direction="right") ``` - ```plaintext Output @@ -603,7 +467,7 @@ why it must return a tuple of two strings, where the first string is the identif See the section on [semantic versioning](#semantic-versioning) down below for more information. - ```python Python (Sync) + ```python Python class MyTask(Task): def execute(self, context: ExecutionContext) -> None: pass @@ -620,24 +484,6 @@ See the section on [semantic versioning](#semantic-versioning) down below for mo # MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0" ``` - ```python Python (Async) - class MyTask(Task): - async def execute(self, context: ExecutionContext) -> None: - pass - - # MyTask has the identifier "MyTask" and the default version of "v0.0" - - class MyTask2(Task): - @staticmethod - def identifier() -> tuple[str, str]: - return "tilebox.com/example_workflow/MyTask", "v0.0" - - async def execute(self, context: ExecutionContext) -> None: - pass - - # MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0" - ``` - @@ -666,7 +512,7 @@ The version number must match the pattern `vX.Y`, where `X` and `Y` are non-nega As an example, below is a task that has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`: - ```python Python (Sync) + ```python Python class MyTask(Task): @staticmethod def identifier() -> tuple[str, str]: @@ -675,16 +521,6 @@ As an example, below is a task that has the identifier `"tilebox.com/example_wor def execute(self, context: ExecutionContext) -> None: pass ``` - ```python Python (Async) - class MyTask(Task): - @staticmethod - def identifier() -> tuple[str, str]: - return "tilebox.com/example_workflow/MyTask", "v1.3" - - async def execute(self, context: ExecutionContext) -> None: - pass - ``` - When task runners execute a task, they need to have a task with a matching identifier string and a compatible version number registered. diff --git a/workflows/near-real-time/recurrent-tasks.mdx b/workflows/near-real-time/recurrent-tasks.mdx index 57b2cdd..0df1bd6 100644 --- a/workflows/near-real-time/recurrent-tasks.mdx +++ b/workflows/near-real-time/recurrent-tasks.mdx @@ -101,4 +101,3 @@ Additionally, you'll be able to register new recurrent tasks or edit/delete exis Tilebox Workflows recurrent tasks in the Tilebox Console Tilebox Workflows recurrent tasks in the Tilebox Console - diff --git a/workflows/observability/tracing.mdx b/workflows/observability/tracing.mdx index ab58305..2a96fbb 100644 --- a/workflows/observability/tracing.mdx +++ b/workflows/observability/tracing.mdx @@ -36,7 +36,6 @@ call the corresponding configuration functions during the start up of your [task To configure tracing with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export Tilebox workflow traces to. Additionally, an API key with ingest permissions to that dataset is required. - ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.tracing import configure_otel_tracing_axiom