From 3dd83fb4818ac170e08474c4cc304938c21d7079 Mon Sep 17 00:00:00 2001 From: Mika Naylor Date: Fri, 13 Mar 2026 18:29:43 +0100 Subject: [PATCH] [FLINK-39253][table] Preserve field names in ROW function from AS aliases --- .../rules/ResolveCallByArgumentsRule.java | 17 +++++++++++++++++ .../table/types/inference/CallContext.java | 5 +++++ .../inference/strategies/RowTypeStrategy.java | 7 ++++++- .../types/inference/utils/CastCallContext.java | 6 ++++++ .../planner/functions/RowFunctionITCase.java | 18 +++++++++++++++++- 5 files changed, 51 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index e298953f69e3e..2af2a97399d08 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -681,6 +681,23 @@ public List getArgumentDataTypes() { .collect(Collectors.toList()); } + @Override + public Optional getArgumentName(int pos) { + final ResolvedExpression arg = getArgument(pos); + + if (arg instanceof CallExpression) { + final CallExpression call = (CallExpression) arg; + if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) { + final List children = call.getResolvedChildren(); + if (children.size() >= 2 && children.get(1) instanceof ValueLiteralExpression) { + return ((ValueLiteralExpression) children.get(1)).getValueAs(String.class); + } + } + } + + return Optional.empty(); + } + @Override public Optional getOutputDataType() { return Optional.empty(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java index aff38de7fa72a..7c987430289d9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java @@ -113,6 +113,11 @@ default Optional getOutputChangelogMode() { */ List getArgumentDataTypes(); + /** Returns the name/alias of the argument at the given position if one is available. */ + default Optional getArgumentName(int pos) { + return Optional.empty(); + } + /** * Returns the inferred output data type of the function call. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RowTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RowTypeStrategy.java index c9bedb827cd66..7e631a91463f6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RowTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RowTypeStrategy.java @@ -37,7 +37,12 @@ public Optional inferType(CallContext callContext) { List argumentDataTypes = callContext.getArgumentDataTypes(); DataTypes.Field[] fields = IntStream.range(0, argumentDataTypes.size()) - .mapToObj(idx -> DataTypes.FIELD("f" + idx, argumentDataTypes.get(idx))) + .mapToObj( + idx -> { + String fieldName = + callContext.getArgumentName(idx).orElse("f" + idx); + return DataTypes.FIELD(fieldName, argumentDataTypes.get(idx)); + }) .toArray(DataTypes.Field[]::new); return Optional.of(DataTypes.ROW(fields).notNull()); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/utils/CastCallContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/utils/CastCallContext.java index 9b7102548475e..f04055c5d5d2f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/utils/CastCallContext.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/utils/CastCallContext.java @@ -115,6 +115,12 @@ public List getArgumentDataTypes() { return expectedArguments; } + @Override + public Optional getArgumentName(int pos) { + // argument names remain regardless of casting + return originalContext.getArgumentName(pos); + } + @Override public Optional getOutputDataType() { return Optional.ofNullable(outputDataType); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RowFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RowFunctionITCase.java index 4e41f24a31a47..be83d3d45621e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RowFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RowFunctionITCase.java @@ -115,7 +115,23 @@ Stream getTestSetSpecs() { DataTypes.FIELD("b", DataTypes.TINYINT()), DataTypes.FIELD("c", DataTypes.BIGINT()), DataTypes.FIELD("d", DataTypes.BOOLEAN())) - .notNull())); + .notNull()), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.ROW, "with aliased fields using .as()") + .onFieldsWithData(100, "abc", 75.50) + .andDataTypes(DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE()) + .testTableApiResult( + row($("f0").as("a"), $("f1").as("b"), $("f2").as("c")), + Row.of(100, "abc", 75.50), + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING()), + DataTypes.FIELD("c", DataTypes.DOUBLE())) + .notNull()) + .testTableApiResult( + row($("f0").as("a"), $("f1").as("b"), $("f2").as("c")).get("a"), + 100, + DataTypes.INT())); } // --------------------------------------------------------------------------------------------