Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shared_memory to task with extended resources #3096

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

thomasjpfan
Copy link
Member

@thomasjpfan thomasjpfan commented Jan 25, 2025

Tracking issue

Towards flyteorg/flyte#6142
Requires flyteidl change & implementation: flyteorg/flyte#6193

Why are the changes needed?

This PR adds shared memory as an extend resource, that is made available through @task(shared_memory). For the simple case, you can have @task(shared_memory=True), which means: "memory backed volumes are sized to node allocatable memory". Otherwise, you can set shared_memory="2Gi" to specify the value.

What changes were proposed in this pull request?

This PR adds shared_memory to the user facing API and pushes the extended resources into the IDL.

How was this patch tested?

Unit tests were added to this PR and tested with flytekit changes:

import os
from flytekit import task, ImageSpec

image = ImageSpec(
    name="flytekit",
    apt_packages=["git"],
    registry="localhost:30000",
    commands=[
        "uv pip install git+https://github.com/thomasjpfan/flyte.git@65dda339b0088d9e568877577fa78fc88b223582#subdirectory=flytekit"
        "uv pip install git+https://github.com/thomasjpfan/flyte.git@d2c76ff330077875f7826c278f660add7f2c50a9#subdirectory=flyteidl"
    ],
)


@task(container_image=image)
def check_shm2() -> bool:
    return os.path.exists("/dev/shm")

Summary by Bito

This PR implements comprehensive shared memory support for Flyte tasks with flexible memory specification via @task decorator, supporting both boolean flags and exact memory sizes. The implementation includes domain management APIs, pod template handling, and enhanced progress tracking. Additional features include improved StructuredDatasets handling, type hint updates across core modules, and environment variable validation in the security model.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 25, 2025

Code Review Agent Run #3d3e41

Actionable Suggestions - 4
  • tests/flytekit/unit/core/test_resources.py - 1
    • Consider consolidating redundant test cases · Line 162-164
  • flytekit/core/constants.py - 1
    • Insecure hardcoded temporary file path · Line 44-44
  • flytekit/core/python_auto_container.py - 1
    • Consider adding shared memory format validation · Line 54-54
  • flytekit/core/task.py - 1
Review Details
  • Files reviewed - 10 · Commit Range: ac61755..aecce00
    • flytekit/core/constants.py
    • flytekit/core/node.py
    • flytekit/core/python_auto_container.py
    • flytekit/core/resources.py
    • flytekit/core/task.py
    • flytekit/tools/script_mode.py
    • tests/flytekit/unit/core/test_array_node_map_task.py
    • tests/flytekit/unit/core/test_node_creation.py
    • tests/flytekit/unit/core/test_resources.py
    • tests/flytekit/unit/models/test_tasks.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@thomasjpfan thomasjpfan changed the title Add shared_memory_volume to task Add shared_memory to task with extended resources Jan 25, 2025
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 25, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
New Feature - Shared Memory Support Implementation

constants.py - Added shared memory mount constants

resources.py - Implemented shared memory resource construction logic

node.py - Added shared memory support to node configuration

python_auto_container.py - Extended container task with shared memory capabilities

task.py - Added shared memory parameter to task decorator

Testing - Shared Memory Test Coverage

test_array_node_map_task.py - Added tests for shared memory in array node tasks

test_node_creation.py - Added tests for shared memory overrides

test_resources.py - Added tests for shared memory resource construction

test_tasks.py - Updated task model tests to include shared memory

Comment on lines +162 to +164
@pytest.mark.parametrize("shared_memory", [None, False])
def test_construct_extended_resources_shared_memory_none(shared_memory):
resources = construct_extended_resources(shared_memory=shared_memory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating redundant test cases

Consider consolidating the test cases for None and False into a single test case since they produce the same behavior. Both values result in resources being None.

Code suggestion
Check the AI-generated fix before applying
Suggested change
@pytest.mark.parametrize("shared_memory", [None, False])
def test_construct_extended_resources_shared_memory_none(shared_memory):
resources = construct_extended_resources(shared_memory=shared_memory)
def test_construct_extended_resources_shared_memory_none():
resources = construct_extended_resources(shared_memory=None)

Code Review Run #3d3e41


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


# Shared memory mount name and path
SHARED_MEMORY_MOUNT_NAME = "flyte-shared-memory"
SHARED_MEMORY_MOUNT_PATH = "/dev/shm"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insecure hardcoded temporary file path

Consider using a more secure temporary file location instead of hardcoding '/dev/shm'. The shared memory directory could potentially be accessed by other processes on the system. Consider using 'tempfile.gettempdir()' to get a secure temporary directory location.

Code suggestion
Check the AI-generated fix before applying
Suggested change
SHARED_MEMORY_MOUNT_PATH = "/dev/shm"
import tempfile
SHARED_MEMORY_MOUNT_PATH = tempfile.gettempdir()

Code Review Run #3d3e41


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -51,6 +51,7 @@ def __init__(
pod_template: Optional[PodTemplate] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
shared_memory: Optional[Union[Literal[True], str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding shared memory format validation

Consider validating the shared_memory parameter when it's a string to ensure it follows memory size format (e.g., '1Gi', '512Mi'). Currently there's no validation for the string format.

Code suggestion
Check the AI-generated fix before applying
Suggested change
shared_memory: Optional[Union[Literal[True], str]] = None,
shared_memory: Optional[Union[Literal[True], str]] = None,
if shared_memory and isinstance(shared_memory, str):
import re
if not re.match(r'^[0-9]+(Ki|Mi|Gi|Ti|Pi|Ei|[KMGTPE]i?)?$', shared_memory):
raise ValueError(
f"Invalid shared memory format: {shared_memory}. "
"Must be a valid memory size (e.g., '1Gi', '512Mi')"
)

Code Review Run #3d3e41


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -211,6 +213,7 @@ def task(
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
pickle_untyped: bool = False,
shared_memory: Optional[Union[bool, str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding shared memory validation

Consider adding validation for the shared_memory parameter to ensure it is either a boolean or a valid memory size string (e.g. '1Gi', '512Mi'). Currently there is no validation which could lead to runtime errors.

Code suggestion
Check the AI-generated fix before applying
Suggested change
shared_memory: Optional[Union[bool, str]] = None,
shared_memory: Optional[Union[bool, str]] = None,
def validate_shared_memory(val: Optional[Union[bool, str]]) -> None:
if val is not None and not isinstance(val, bool):
if not isinstance(val, str) or not re.match(r'^[0-9]+(Mi|Gi)$', val):
raise ValueError('shared_memory must be a boolean or valid memory size string (e.g. "1Gi", "512Mi")')
if shared_memory is not None:
validate_shared_memory(shared_memory)

Code Review Run #3d3e41


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@kumare3
Copy link
Contributor

kumare3 commented Jan 26, 2025

Did we make a backend change for this? Can we also add tpu support

@thomasjpfan
Copy link
Member Author

Did we make a backend change for this?

I did the backend change here: flyteorg/flyte#6193

Can we also add tpu support

I expect TPU to be similiar to other accerlators. From GKE's docs, TPUs is another taint: https://cloud.google.com/kubernetes-engine/docs/concepts/tpus#how_tpus_work,

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 26, 2025

Code Review Agent Run #da14ac

Actionable Suggestions - 0
Review Details
  • Files reviewed - 1 · Commit Range: aecce00..eadab88
    • flytekit/core/node.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

…d_memory_extended_sources

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
@@ -4,7 +4,7 @@
import re
from abc import ABC
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, TypeVar, Union
from typing import Callable, Dict, List, Literal, Optional, TypeVar, Union
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use L instead of Literal? i just know i'm going to get confused in the future

flytekit/tools/script_mode.py Outdated Show resolved Hide resolved
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 6, 2025

Code Review Agent Run #31b042

Actionable Suggestions - 2
  • flytekit/core/resources.py - 1
  • tests/flytekit/unit/core/test_resources.py - 1
    • Consider type conversion before assertion comparison · Line 159-159
Additional Suggestions - 10
  • flytekit/remote/remote.py - 3
  • flytekit/models/task.py - 2
    • Consider adding primary container name validation · Line 1034-1034
    • Consider validating generates_deck before wrapping · Line 337-337
  • flytekit/tools/fast_registration.py - 2
  • plugins/flytekit-ray/flytekitplugins/ray/task.py - 3
    • Consider more descriptive error message · Line 50-57
    • Consider validating requests and limits together · Line 33-40
    • Consider extracting pod template creation logic · Line 107-134
Review Details
  • Files reviewed - 34 · Commit Range: eadab88..0dc82b2
    • dev-requirements.txt
    • flytekit/bin/entrypoint.py
    • flytekit/clients/friendly.py
    • flytekit/clients/raw.py
    • flytekit/core/base_task.py
    • flytekit/core/context_manager.py
    • flytekit/core/node.py
    • flytekit/core/resources.py
    • flytekit/core/type_engine.py
    • flytekit/deck/deck.py
    • flytekit/interaction/parse_stdin.py
    • flytekit/loggers.py
    • flytekit/models/core/workflow.py
    • flytekit/models/domain.py
    • flytekit/models/task.py
    • flytekit/remote/remote.py
    • flytekit/tools/fast_registration.py
    • flytekit/tools/translator.py
    • flytekit/types/directory/types.py
    • flytekit/types/structured/structured_dataset.py
    • plugins/flytekit-ray/flytekitplugins/ray/task.py
    • plugins/flytekit-ray/setup.py
    • plugins/flytekit-ray/tests/test_ray.py
    • pydoclint-errors-baseline.txt
    • pyproject.toml
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/sd_attr.py
    • tests/flytekit/unit/core/test_array_node_map_task.py
    • tests/flytekit/unit/core/test_map_task.py
    • tests/flytekit/unit/core/test_node_creation.py
    • tests/flytekit/unit/core/test_resources.py
    • tests/flytekit/unit/core/test_unions.py
    • tests/flytekit/unit/deck/test_deck.py
    • tests/flytekit/unit/test_translator.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo


return tasks_pb2.ExtendedResources(**kwargs)


def pod_spec_from_resources(
primary_container_name: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter name and type change impact

The parameter name change from k8s_pod_name to primary_container_name with an optional type may require updates in calling code.

Code suggestion
Check the AI-generated fix before applying
Suggested change
primary_container_name: Optional[str] = None,
k8s_pod_name: str = None,
primary_container_name: Optional[str] = None, # New parameter, defaults to k8s_pod_name if set

Code Review Run #31b042


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -155,3 +157,18 @@ def test_pod_spec_from_resources_requests_set():
)
pod_spec = pod_spec_from_resources(primary_container_name=primary_container_name, requests=requests, limits=limits)
assert expected_pod_spec == pod_spec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider type conversion before assertion comparison

The assertion expected_pod_spec == pod_spec is comparing two different types - V1PodSpec vs dict. Consider using V1PodSpec(**pod_spec) to convert the dictionary to a V1PodSpec object before comparison.

Code suggestion
Check the AI-generated fix before applying
Suggested change
assert expected_pod_spec == pod_spec
assert expected_pod_spec == V1PodSpec(**pod_spec)

Code Review Run #31b042


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

…d_memory_extended_sources

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 7, 2025

Code Review Agent Run #aa5ba1

Actionable Suggestions - 0
Additional Suggestions - 1
  • flytekit/core/task.py - 1
Review Details
  • Files reviewed - 7 · Commit Range: 0dc82b2..44def02
    • flytekit/core/node.py
    • flytekit/core/python_auto_container.py
    • flytekit/core/resources.py
    • flytekit/core/task.py
    • flytekit/models/security.py
    • flytekit/tools/script_mode.py
    • tests/flytekit/unit/models/core/test_security.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants