Skip to content

Commit

Permalink
refactor: improve error handling and messages for partitioning patter…
Browse files Browse the repository at this point in the history
…ns (#46)

* refactor: improve error handling and messages for partitioning patterns

* fix: typo in error message

* fix: remove unnecessary validation in local runtime

* test: add test coverage and errors for 2 other common partitioning mistakes
  • Loading branch information
larribas authored Sep 28, 2021
1 parent 4246f1b commit 581eddd
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 31 deletions.
52 changes: 38 additions & 14 deletions dagger/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,8 @@ def __init__(

_validate_node_input_dependencies(nodes, inputs)
_validate_outputs(nodes, outputs)

if partition_by_input and partition_by_input not in inputs:
raise ValueError(
f"This node is partitioned by '{partition_by_input}'. However, '{partition_by_input}' is not an input of the node. The available inputs are {sorted(list(inputs))}."
)
_validate_dag_partitioning(partition_by_input, inputs)
_validate_node_partitioning(nodes)

self._nodes = nodes
self._inputs = inputs
Expand Down Expand Up @@ -254,7 +251,7 @@ def _validate_outputs(

if referenced_node.partition_by_input:
raise ValueError(
f"Output '{output_name}' comes from node '{output_type.node}', which is partitioned. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
f"Output '{output_name}' comes from an output of node '{output_type.node}'. Node '{output_type.node}' is partitioned. In Dagger, DAG outputs may not come from partitioned nodes. Check the documentation to better understand the motivation behind this limitation and how to overcome it: https://larribas.me/dagger/user-guide/map-reduce/#you-cannot-return-the-output-of-a-partitioned-node-from-a-dag."
)

if len(set(dag_outputs.values())) != len(dag_outputs):
Expand Down Expand Up @@ -349,14 +346,6 @@ def _validate_input_from_node_output(
f"This input is serialized {input_type.serializer}. However, the output it references is serialized {referenced_node_outputs[input_type.output].serializer}."
)

if (
dag_nodes[node_name].partition_by_input
and dag_nodes[input_type.node].partition_by_input
):
raise ValueError(
"This node is partitioned by an input that comes from the output of another partitioned node. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
)


def _validate_input_is_supported(input_name: str, input):
if not _is_type_supported(input, SupportedInputs):
Expand All @@ -369,3 +358,38 @@ def _is_type_supported(obj, union):
return any(
[isinstance(obj, supported_type) for supported_type in get_type_args(union)]
)


def _validate_dag_partitioning(
partition_by_input: Optional[str],
inputs: Mapping[str, SupportedInputs],
):
if partition_by_input and partition_by_input not in inputs:
raise ValueError(
f"This node is partitioned by '{partition_by_input}'. However, '{partition_by_input}' is not an input of the node. The available inputs are {sorted(list(inputs))}."
)


def _validate_node_partitioning(
nodes: Mapping[str, Node],
):
for node_name, node in nodes.items():
if not node.partition_by_input:
continue

p = node.inputs[node.partition_by_input]
if not isinstance(p, FromNodeOutput):
raise ValueError(
f"Node '{node_name}' is partitioned by its input '{node.partition_by_input}'. However, '{node.partition_by_input}' does not come from the output of another node. In Dagger, nodes can only be partitioned by the output of another sibling node. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)

if nodes[p.node].partition_by_input:
raise ValueError(
f"Node '{node_name}' is partitioned by an input that comes from the output of another node, '{p.node}'. Node '{p.node}' is also partitioned. In Dagger, a node cannot be partitioned by the output of another partitioned node. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)

o = nodes[p.node].outputs[p.output]
if isinstance(o, FromNodeOutput) or not o.is_partitioned:
raise ValueError(
f"Node '{node_name}' is partitioned by its input '{node.partition_by_input}', which comes from the output '{p.output}' of node '{p.node}'. However, this output is not partitioned. Dagger only allows you to partition by inputs that come from a partitioned output. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)
5 changes: 0 additions & 5 deletions dagger/runtime/local/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ def _node_param_partitions(
params=params,
outputs=outputs,
)
if not isinstance(input_value, Iterable):
raise TypeError(
f"This node is supposed to be partitioned by input '{node.partition_by_input}'. When a node is partitioned, the value of the input that determines the partition should be an iterable. Instead, we found a value of type '{type(input_value).__name__}'."
)

return [{node.partition_by_input: p, **fixed_params} for p in input_value]
else:
return [fixed_params]
Expand Down
4 changes: 2 additions & 2 deletions dagger/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ def _validate_partitioned_input(

if isinstance(inputs[partition_by_input], FromParam):
raise ValueError(
"Nodes may not be partitioned by an input that comes from a parameter. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
"This node is partitioned by input '{partition_by_input}', which comes from a parameter. In Dagger, nodes may not be partitioned by an input that comes from a parameter. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


def _validate_there_are_no_partitioned_outputs(outputs: Mapping[str, SupportedOutputs]):
for output_name, output_type in outputs.items():
if output_type.is_partitioned:
raise ValueError(
"Partitioned nodes may not generate partitioned outputs. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
"This node is partitioned. In Dagger, partitioned nodes may not generate partitioned outputs. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


Expand Down
2 changes: 1 addition & 1 deletion docs/code_snippets/map_reduce/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def choose_best_model(alternative_models, test_dataset):
"generate-training-combinations": Task(
generate_training_combinations,
outputs={
"combinations": FromReturnValue(),
"combinations": FromReturnValue(is_partitioned=True),
},
),
"train-model": Task(
Expand Down
32 changes: 32 additions & 0 deletions examples/dump_argo_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""CLI utility to dump the manifest for a specific example."""

from typing import List

import yaml

from dagger import DAG
from dagger.runtime.argo import Metadata, Workflow, workflow_manifest


def dump_argo_manifest(dag: DAG, example_name: str):
"""Generate a workflow manifest from the supplied DAG and dump it as a YAML file into tests/examples/argo/."""
manifest = _generate_manifest(
dag,
name=example_name.replace("_", "-"),
entrypoint=["python", f"examples/{example_name}.py"],
)

with open(f"tests/examples/argo/{example_name}.yaml", "w") as f:
yaml.safe_dump(manifest, f)


def _generate_manifest(dag: DAG, name: str, entrypoint: List[str]):
metadata = Metadata(
name=name,
generate_name_from_prefix=True,
)
workflow = Workflow(
container_image="local.registry/dagger",
container_entrypoint_to_dag_cli=entrypoint,
)
return workflow_manifest(dag, metadata=metadata, workflow=workflow)
84 changes: 82 additions & 2 deletions tests/dag/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def test__init__with_dag_output_from_a_partitioned_node():

assert (
str(e.value)
== "Output 'r' comes from node 'map', which is partitioned. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "Output 'r' comes from an output of node 'map'. Node 'map' is partitioned. In Dagger, DAG outputs may not come from partitioned nodes. Check the documentation to better understand the motivation behind this limitation and how to overcome it: https://larribas.me/dagger/user-guide/map-reduce/#you-cannot-return-the-output-of-a-partitioned-node-from-a-dag."
)


Expand Down Expand Up @@ -358,7 +358,87 @@ def test__init__partitioned_by_output_of_partitioned_node():

assert (
str(e.value)
== "Error validating input 'n' of node 'map-2': This node is partitioned by an input that comes from the output of another partitioned node. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "Node 'map-2' is partitioned by an input that comes from the output of another node, 'map-1'. Node 'map-1' is also partitioned. In Dagger, a node cannot be partitioned by the output of another partitioned node. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


def test__init__with_node_partitioned_by_non_partitioned_output():
with pytest.raises(ValueError) as e:
DAG(
{
"non-partitioned-fan-out": Task(
lambda: [1, 2],
outputs={"numbers": FromReturnValue()},
),
"map": Task(
lambda n: n,
inputs={"n": FromNodeOutput("non-partitioned-fan-out", "numbers")},
partition_by_input="n",
),
}
)

assert (
str(e.value)
== "Node 'map' is partitioned by its input 'n', which comes from the output 'numbers' of node 'non-partitioned-fan-out'. However, this output is not partitioned. Dagger only allows you to partition by inputs that come from a partitioned output. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


def test__init__with_node_partitioned_by_parameter():
with pytest.raises(ValueError) as e:
DAG(
inputs={
"x": FromParam(),
},
nodes={
"map": DAG(
inputs={
"n": FromParam("x"),
},
nodes={
"n": Task(lambda: 1),
},
partition_by_input="n",
),
},
)

assert (
str(e.value)
== "Node 'map' is partitioned by its input 'n'. However, 'n' does not come from the output of another node. In Dagger, nodes can only be partitioned by the output of another sibling node. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


def test__init__with_dag_partitioned_by_dag_output():
with pytest.raises(ValueError) as e:
DAG(
nodes={
"dag-returning-output": DAG(
outputs={
"x": FromNodeOutput("n", "o"),
},
nodes={
"n": Task(
lambda: 1,
outputs={
"o": FromReturnValue(),
},
),
},
),
"map": Task(
lambda n: n,
inputs={
"n": FromNodeOutput("dag-returning-output", "x"),
},
partition_by_input="n",
),
}
)

assert (
str(e.value)
== "Node 'map' is partitioned by its input 'n', which comes from the output 'x' of node 'dag-returning-output'. However, this output is not partitioned. Dagger only allows you to partition by inputs that come from a partitioned output. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


Expand Down
2 changes: 1 addition & 1 deletion tests/docs/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_single_mapping_node():

assert (
str(e.value)
== "Error validating input 'partition' of node 'do-something-else-with': This node is partitioned by an input that comes from the output of another partitioned node. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "Node 'do-something-else-with' is partitioned by an input that comes from the output of another node, 'do-something-with'. Node 'do-something-with' is also partitioned. In Dagger, a node cannot be partitioned by the output of another partitioned node. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)

dag = dsl.build(valid.dag) # no error
Expand Down
4 changes: 2 additions & 2 deletions tests/dsl/test_black_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ def dag():

assert (
str(e.value)
== "Error validating input 'n' of node 'double-2': This node is partitioned by an input that comes from the output of another partitioned node. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "Node 'double-2' is partitioned by an input that comes from the output of another node, 'double-1'. Node 'double-1' is also partitioned. In Dagger, a node cannot be partitioned by the output of another partitioned node. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


Expand All @@ -865,5 +865,5 @@ def dag(partitions):

assert (
str(e.value)
== "Partitioned nodes may not generate partitioned outputs. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "This node is partitioned. In Dagger, partitioned nodes may not generate partitioned outputs. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)
4 changes: 2 additions & 2 deletions tests/runtime/local/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def test__invoke_dag__with_partitions_but_invalid_outputs():
nodes={
"generate-single-number": Task(
lambda: 1,
outputs={"n": FromReturnValue()},
outputs={"n": FromReturnValue(is_partitioned=True)},
),
"poorly-partitioned-task": Task(
lambda x: x,
Expand All @@ -226,5 +226,5 @@ def test__invoke_dag__with_partitions_but_invalid_outputs():

assert (
str(e.value)
== "Error when invoking node 'poorly-partitioned-task'. This node is supposed to be partitioned by input 'x'. When a node is partitioned, the value of the input that determines the partition should be an iterable. Instead, we found a value of type 'int'."
== "Error when invoking node 'generate-single-number'. We encountered the following error while attempting to serialize the results of this task: Output 'n' was declared as a partitioned output, but the return value was not an iterable (instead, it was of type 'int'). Partitioned outputs should be iterables of values (e.g. lists or sets). Each value in the iterable must be serializable with the serializer defined in the output."
)
4 changes: 2 additions & 2 deletions tests/task/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test__init__with_node_partitioned_by_param():

assert (
str(e.value)
== "Nodes may not be partitioned by an input that comes from a parameter. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "This node is partitioned by input '{partition_by_input}', which comes from a parameter. In Dagger, nodes may not be partitioned by an input that comes from a parameter. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


Expand All @@ -137,7 +137,7 @@ def test__init__with_partitioned_node_with_partitioned_output():

assert (
str(e.value)
== "Partitioned nodes may not generate partitioned outputs. This is not a valid map-reduce pattern in dagger. Please check the 'Map Reduce' section in the documentation for an explanation of why this is not possible and suggestions of other valid map-reduce patterns."
== "This node is partitioned. In Dagger, partitioned nodes may not generate partitioned outputs. Check the documentation to better understand how partitioning works: https://larribas.me/dagger/user-guide/partitioning/"
)


Expand Down

0 comments on commit 581eddd

Please sign in to comment.