diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go index de0108d4dc..4b10742e4d 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -201,6 +201,8 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, err } + // Clear the inputs of the nodes in the dynamic workflow. This is done to avoid bloating the node event. + clearNodeInputs(ctx, closure.GetPrimary().GetTemplate().GetNodes()) if err := f.Cache(ctx, dynamicWf, closure); err != nil { logger.Errorf(ctx, "Failed to cache Dynamic workflow [%s]", err.Error()) } @@ -221,6 +223,19 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C }, nil } +// This function is used to clear the inputs of the nodes in the dynamic workflow. This is done to avoid bloating the node event. +func clearNodeInputs(ctx context.Context, nodes []*core.Node) { + for _, node := range nodes { + node.Inputs = nil + switch node.Target.(type) { + case *core.Node_ArrayNode: + node.Target.(*core.Node_ArrayNode).ArrayNode.Node.Inputs = nil + default: + logger.Debugf(ctx, "node type %T not supported for clearing inputs", node.Target) + } + } +} + func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, error) { wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus) diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index ec20c14cd0..efccddb2d3 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -586,6 +586,41 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t }) } +func TestClearNodeInputs_WithCoreBindings(t *testing.T) { + ctx := context.Background() + + t.Run("clear inputs for regular nodes with core bindings", func(t *testing.T) { + nodes := []*core.Node{ + {Inputs: []*core.Binding{{Var: "input1"}}}, + {Inputs: []*core.Binding{{Var: "input2"}}}, + } + clearNodeInputs(ctx, nodes) + for _, node := range nodes { + assert.Nil(t, node.Inputs) + } + }) + + t.Run("clear inputs for array nodes with core bindings", func(t *testing.T) { + nodes := []*core.Node{ + { + Inputs: []*core.Binding{{Var: "input1"}}, + Target: &core.Node_ArrayNode{ + ArrayNode: &core.ArrayNode{ + Node: &core.Node{Inputs: []*core.Binding{{Var: "input2"}}}, + }, + }, + }, + } + clearNodeInputs(ctx, nodes) + for _, node := range nodes { + assert.Nil(t, node.Inputs) + if arrayNode, ok := node.Target.(*core.Node_ArrayNode); ok { + assert.Nil(t, arrayNode.ArrayNode.Node.Inputs) + } + } + }) +} + type existsMetadata struct{} func (e existsMetadata) ContentMD5() string {