Skip to content

Commit

Permalink
[FLINK-36000][table-planner] Fix DynamicTableSink#Context's getTarget…
Browse files Browse the repository at this point in the history
…Columns should return an Optional#empty instead of int[0] for insert without column list
  • Loading branch information
lincoln-lil committed Aug 7, 2024
1 parent 8ab0112 commit a15f9c1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1389,8 +1389,11 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
private int[][] getTargetColumnIndices(
@Nonnull ContextResolvedTable contextResolvedTable,
@Nullable SqlNodeList targetColumns) {
if (targetColumns == null) {
return null;
}
List<String> allColumns = contextResolvedTable.getResolvedSchema().getColumnNames();
return Optional.ofNullable(targetColumns).orElse(SqlNodeList.EMPTY).stream()
return targetColumns.stream()
.mapToInt(c -> allColumns.indexOf(((SqlIdentifier) c).getSimple()))
.mapToObj(idx -> new int[] {idx})
.toArray(int[][]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ abstract class Sink(
.getOrElse(Array.empty[Array[Int]])
.map(_.mkString("[", ",", "]"))
.mkString(","),
targetColumns != null && targetColumns.length > 0
targetColumns != null
)
.item("fields", getRowType.getFieldNames.mkString(", "))
.itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,12 @@ public Optional<Integer> getParallelism() {
} else {
// we don't support OutputFormat for updating query in the TestValues connector
assertThat(runtimeSink.equals("SinkFunction")).isTrue();
// check the contract of the context.getTargetColumns method returns the expected
// empty Option or non-empty Option with a non-empty array
assertThat(
!context.getTargetColumns().isPresent()
|| context.getTargetColumns().get().length > 0)
.isTrue();
SinkFunction<RowData> sinkFunction;
if (primaryKeyIndices.length > 0) {
// TODO FLINK-31301 currently partial-insert composite columns are not supported
Expand Down

0 comments on commit a15f9c1

Please sign in to comment.