Skip to content

Commit

Permalink
[COR-2671] Clear inputs dynamic closure (#645)
Browse files Browse the repository at this point in the history
* Clear inputs for the dynamic workflow to avoid bloating the event message

Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>

* Clear inputs for the dynamic workflow to avoid bloating the event message

Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>

---------

Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
  • Loading branch information
pmahindrakar-oss committed Feb 9, 2025
1 parent 34205dd commit e3c6d85
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
15 changes: 15 additions & 0 deletions flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return dynamicWorkflowContext{}, err
}

// Clear the inputs of the nodes in the dynamic workflow. This is done to avoid bloating the node event.
clearNodeInputs(ctx, closure.GetPrimary().GetTemplate().GetNodes())
if err := f.Cache(ctx, dynamicWf, closure); err != nil {
logger.Errorf(ctx, "Failed to cache Dynamic workflow [%s]", err.Error())
}
Expand All @@ -221,6 +223,19 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
}, nil
}

// This function is used to clear the inputs of the nodes in the dynamic workflow. This is done to avoid bloating the node event.
func clearNodeInputs(ctx context.Context, nodes []*core.Node) {
for _, node := range nodes {
node.Inputs = nil
switch node.Target.(type) {
case *core.Node_ArrayNode:
node.Target.(*core.Node_ArrayNode).ArrayNode.Node.Inputs = nil
default:
logger.Debugf(ctx, "node type %T not supported for clearing inputs", node.Target)
}
}
}

func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext,
djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, error) {
wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,41 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
})
}

func TestClearNodeInputs_WithCoreBindings(t *testing.T) {
ctx := context.Background()

t.Run("clear inputs for regular nodes with core bindings", func(t *testing.T) {
nodes := []*core.Node{
{Inputs: []*core.Binding{{Var: "input1"}}},
{Inputs: []*core.Binding{{Var: "input2"}}},
}
clearNodeInputs(ctx, nodes)
for _, node := range nodes {
assert.Nil(t, node.Inputs)
}
})

t.Run("clear inputs for array nodes with core bindings", func(t *testing.T) {
nodes := []*core.Node{
{
Inputs: []*core.Binding{{Var: "input1"}},
Target: &core.Node_ArrayNode{
ArrayNode: &core.ArrayNode{
Node: &core.Node{Inputs: []*core.Binding{{Var: "input2"}}},
},
},
},
}
clearNodeInputs(ctx, nodes)
for _, node := range nodes {
assert.Nil(t, node.Inputs)
if arrayNode, ok := node.Target.(*core.Node_ArrayNode); ok {
assert.Nil(t, arrayNode.ArrayNode.Node.Inputs)
}
}
})
}

type existsMetadata struct{}

func (e existsMetadata) ContentMD5() string {
Expand Down

0 comments on commit e3c6d85

Please sign in to comment.