From 8ab011243a8028aeee5bbc73ac56a7426a5c1916 Mon Sep 17 00:00:00 2001 From: James Hughes Date: Wed, 7 Aug 2024 07:49:59 -0400 Subject: [PATCH] [FLINK-35934][table-planner] Add CompiledPlan annotations to BatchExecValues --- .../nodes/exec/batch/BatchExecValues.java | 22 +++++ .../plan/utils/ExecNodeMetadataUtil.java | 2 + .../exec/batch/ValuesBatchRestoreTest.java | 39 +++++++++ .../ValuesTestPrograms.java | 4 +- .../nodes/exec/stream/ValuesRestoreTest.java | 1 + .../values-test/plan/values-test.json | 80 +++++++++++++++++++ 6 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/{stream => common}/ValuesTestPrograms.java (93%) create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java index b1be0f8395488..8417a75cf6ba5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; +import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; @@ -25,15 +26,25 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecValues; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import org.apache.calcite.rex.RexLiteral; import java.util.List; /** Batch {@link ExecNode} that read records from given values. */ +@ExecNodeMetadata( + name = "batch-exec-values", + version = 1, + producedTransformations = CommonExecValues.VALUES_TRANSFORMATION, + minPlanVersion = FlinkVersion.v2_0, + minStateVersion = FlinkVersion.v2_0) public class BatchExecValues extends CommonExecValues implements BatchExecNode { public BatchExecValues( @@ -50,6 +61,17 @@ public BatchExecValues( description); } + @JsonCreator + public BatchExecValues( + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, + @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig, + @JsonProperty(FIELD_NAME_TUPLES) List> tuples, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(id, context, persistedConfig, tuples, outputType, description); + } + @Override protected Transformation translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java index ffc4da4a6de67..bd6a7d4e1da43 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java @@ -33,6 +33,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize; @@ -159,6 +160,7 @@ private ExecNodeMetadataUtil() { add(BatchExecCalc.class); add(BatchExecExchange.class); add(BatchExecSort.class); + add(BatchExecValues.class); } }; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java new file mode 100644 index 0000000000000..03e8898951aa6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.table.planner.plan.nodes.exec.common.ValuesTestPrograms; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Collections; +import java.util.List; + +/** Batch Compiled Plan tests for {@link BatchExecValues}. */ +public class ValuesBatchRestoreTest extends BatchRestoreTestBase { + + public ValuesBatchRestoreTest() { + super(BatchExecValues.class); + } + + @Override + public List programs() { + return Collections.singletonList(ValuesTestPrograms.VALUES_TEST); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java similarity index 93% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java index 792299685f240..300464feb6a61 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.plan.nodes.exec.stream; +package org.apache.flink.table.planner.plan.nodes.exec.common; import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.TableTestProgram; @@ -24,7 +24,7 @@ /** {@link TableTestProgram} definitions for testing {@link StreamExecValues}. */ public class ValuesTestPrograms { - static final TableTestProgram VALUES_TEST = + public static final TableTestProgram VALUES_TEST = TableTestProgram.of("values-test", "validates values node") .setupTableSink( SinkTestStep.newBuilder("sink_t") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java index e6ce507ef03dd..47f00ab2b3a83 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.flink.table.planner.plan.nodes.exec.common.ValuesTestPrograms; import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; import org.apache.flink.table.test.program.TableTestProgram; diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json new file mode 100644 index 0000000000000..7e45b691171f8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json @@ -0,0 +1,80 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 1, + "type" : "batch-exec-values_1", + "tuples" : [ [ { + "kind" : "LITERAL", + "value" : 1, + "type" : "INT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 2, + "type" : "INT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : "Hi", + "type" : "CHAR(2) NOT NULL" + } ], [ { + "kind" : "LITERAL", + "value" : 3, + "type" : "INT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 4, + "type" : "INT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : "Hello", + "type" : "CHAR(5) NOT NULL" + } ] ], + "outputType" : "ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL, `EXPR$2` VARCHAR(5) NOT NULL>", + "description" : "Values(tuples=[[{ 1, 2, _UTF-16LE'Hi' }, { 3, 4, _UTF-16LE'Hello' }]], values=[EXPR$0, EXPR$1, EXPR$2])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "batch-exec-sink_1", + "configuration" : { + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.type-length-enforcer" : "IGNORE" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "BLOCKING", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL, `EXPR$2` VARCHAR(5) NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[EXPR$0, EXPR$1, EXPR$2])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file