-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add shared_memory to task with extended resources #3096
base: master
Are you sure you want to change the base?
Add shared_memory to task with extended resources #3096
Conversation
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Code Review Agent Run #3d3e41Actionable Suggestions - 4
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
@pytest.mark.parametrize("shared_memory", [None, False]) | ||
def test_construct_extended_resources_shared_memory_none(shared_memory): | ||
resources = construct_extended_resources(shared_memory=shared_memory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the test cases for None
and False
into a single test case since they produce the same behavior. Both values result in resources
being None
.
Code suggestion
Check the AI-generated fix before applying
@pytest.mark.parametrize("shared_memory", [None, False]) | |
def test_construct_extended_resources_shared_memory_none(shared_memory): | |
resources = construct_extended_resources(shared_memory=shared_memory) | |
def test_construct_extended_resources_shared_memory_none(): | |
resources = construct_extended_resources(shared_memory=None) |
Code Review Run #3d3e41
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
|
||
# Shared memory mount name and path | ||
SHARED_MEMORY_MOUNT_NAME = "flyte-shared-memory" | ||
SHARED_MEMORY_MOUNT_PATH = "/dev/shm" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more secure temporary file location instead of hardcoding '/dev/shm'. The shared memory directory could potentially be accessed by other processes on the system. Consider using 'tempfile.gettempdir()' to get a secure temporary directory location.
Code suggestion
Check the AI-generated fix before applying
SHARED_MEMORY_MOUNT_PATH = "/dev/shm" | |
import tempfile | |
SHARED_MEMORY_MOUNT_PATH = tempfile.gettempdir() |
Code Review Run #3d3e41
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -51,6 +51,7 @@ def __init__( | |||
pod_template: Optional[PodTemplate] = None, | |||
pod_template_name: Optional[str] = None, | |||
accelerator: Optional[BaseAccelerator] = None, | |||
shared_memory: Optional[Union[Literal[True], str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating the shared_memory
parameter when it's a string to ensure it follows memory size format (e.g., '1Gi', '512Mi'). Currently there's no validation for the string format.
Code suggestion
Check the AI-generated fix before applying
shared_memory: Optional[Union[Literal[True], str]] = None, | |
shared_memory: Optional[Union[Literal[True], str]] = None, | |
if shared_memory and isinstance(shared_memory, str): | |
import re | |
if not re.match(r'^[0-9]+(Ki|Mi|Gi|Ti|Pi|Ei|[KMGTPE]i?)?$', shared_memory): | |
raise ValueError( | |
f"Invalid shared memory format: {shared_memory}. " | |
"Must be a valid memory size (e.g., '1Gi', '512Mi')" | |
) |
Code Review Run #3d3e41
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/task.py
Outdated
@@ -211,6 +213,7 @@ def task( | |||
pod_template_name: Optional[str] = None, | |||
accelerator: Optional[BaseAccelerator] = None, | |||
pickle_untyped: bool = False, | |||
shared_memory: Optional[Union[bool, str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for the shared_memory
parameter to ensure it is either a boolean or a valid memory size string (e.g. '1Gi', '512Mi'). Currently there is no validation which could lead to runtime errors.
Code suggestion
Check the AI-generated fix before applying
shared_memory: Optional[Union[bool, str]] = None, | |
shared_memory: Optional[Union[bool, str]] = None, | |
def validate_shared_memory(val: Optional[Union[bool, str]]) -> None: | |
if val is not None and not isinstance(val, bool): | |
if not isinstance(val, str) or not re.match(r'^[0-9]+(Mi|Gi)$', val): | |
raise ValueError('shared_memory must be a boolean or valid memory size string (e.g. "1Gi", "512Mi")') | |
if shared_memory is not None: | |
validate_shared_memory(shared_memory) |
Code Review Run #3d3e41
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Did we make a backend change for this? Can we also add tpu support |
I did the backend change here: flyteorg/flyte#6193
I expect TPU to be similiar to other accerlators. From GKE's docs, TPUs is another taint: https://cloud.google.com/kubernetes-engine/docs/concepts/tpus#how_tpus_work, |
Code Review Agent Run #da14acActionable Suggestions - 0Review Details
|
…d_memory_extended_sources Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
@@ -4,7 +4,7 @@ | |||
import re | |||
from abc import ABC | |||
from dataclasses import dataclass | |||
from typing import Callable, Dict, List, Optional, TypeVar, Union | |||
from typing import Callable, Dict, List, Literal, Optional, TypeVar, Union |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use L
instead of Literal
? i just know i'm going to get confused in the future
Code Review Agent Run #31b042Actionable Suggestions - 2
Additional Suggestions - 10
Review Details
|
|
||
return tasks_pb2.ExtendedResources(**kwargs) | ||
|
||
|
||
def pod_spec_from_resources( | ||
primary_container_name: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter name change from k8s_pod_name
to primary_container_name
with an optional type may require updates in calling code.
Code suggestion
Check the AI-generated fix before applying
primary_container_name: Optional[str] = None, | |
k8s_pod_name: str = None, | |
primary_container_name: Optional[str] = None, # New parameter, defaults to k8s_pod_name if set |
Code Review Run #31b042
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -155,3 +157,18 @@ def test_pod_spec_from_resources_requests_set(): | |||
) | |||
pod_spec = pod_spec_from_resources(primary_container_name=primary_container_name, requests=requests, limits=limits) | |||
assert expected_pod_spec == pod_spec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion expected_pod_spec == pod_spec
is comparing two different types - V1PodSpec
vs dict
. Consider using V1PodSpec(**pod_spec)
to convert the dictionary to a V1PodSpec
object before comparison.
Code suggestion
Check the AI-generated fix before applying
assert expected_pod_spec == pod_spec | |
assert expected_pod_spec == V1PodSpec(**pod_spec) |
Code Review Run #31b042
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
…d_memory_extended_sources Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Code Review Agent Run #aa5ba1Actionable Suggestions - 0Additional Suggestions - 1
Review Details
|
Tracking issue
Towards flyteorg/flyte#6142
Requires flyteidl change & implementation: flyteorg/flyte#6193
Why are the changes needed?
This PR adds shared memory as an extend resource, that is made available through @task(shared_memory). For the simple case, you can have @task(shared_memory=True), which means: "memory backed volumes are sized to node allocatable memory". Otherwise, you can set shared_memory="2Gi" to specify the value.
What changes were proposed in this pull request?
This PR adds shared_memory to the user facing API and pushes the extended resources into the IDL.
How was this patch tested?
Unit tests were added to this PR and tested with flytekit changes:
Summary by Bito
This PR implements comprehensive shared memory support for Flyte tasks with flexible memory specification via @task decorator, supporting both boolean flags and exact memory sizes. The implementation includes domain management APIs, pod template handling, and enhanced progress tracking. Additional features include improved StructuredDatasets handling, type hint updates across core modules, and environment variable validation in the security model.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5