diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java index fbd0b194d0b9d..d185ab4a385d4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java @@ -97,7 +97,9 @@ public static String toString(TableChange tableChange) { return String.format( " MODIFY %s COMMENT '%s'", EncodingUtils.escapeIdentifier(modifyColumnComment.getNewColumn().getName()), - modifyColumnComment.getNewComment()); + modifyColumnComment.getNewComment() == null + ? "" + : modifyColumnComment.getNewComment()); } else if (tableChange instanceof TableChange.ModifyPhysicalColumnType) { TableChange.ModifyPhysicalColumnType modifyPhysicalColumnType = (TableChange.ModifyPhysicalColumnType) tableChange; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java index 0930b69f1ae1b..c6d4201256515 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -119,7 +119,7 @@ private static String toString(TableChange tableChange) { } else if (tableChange instanceof ModifyDefinitionQuery) { ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) tableChange; return String.format( - " MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery()); + " MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery()); } else { return AlterTableChangeOperation.toString(tableChange); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index 554e89f2ffcd8..ce0054341227b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -24,7 +24,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; -import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -32,6 +31,8 @@ import org.apache.flink.table.catalog.SchemaResolver; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; @@ -123,55 +124,116 @@ private Operation handleCreate( private Function> buildTableChanges( final MergeContext mergeContext, final SchemaResolver schemaResolver) { return oldTable -> { - final List changes = new ArrayList<>(); + final List changes = + getSchemaTableChanges(mergeContext, schemaResolver, oldTable); - final ResolvedSchema oldSchema = oldTable.getResolvedSchema(); - final List newColumns = - MaterializedTableUtils.validateAndExtractNewColumns( - oldSchema, - schemaResolver.resolve(mergeContext.getMergedSchema()), - mergeContext.hasSchemaDefinition()); + changes.addAll(getQueryTableChanges(mergeContext, oldTable)); + changes.addAll(getOptionsTableChanges(mergeContext, oldTable)); + changes.addAll(getDistributionTableChanges(mergeContext, oldTable)); - newColumns.forEach(column -> changes.add(TableChange.add(column))); - changes.add( - TableChange.modifyDefinitionQuery( - mergeContext.getMergedOriginalQuery(), - mergeContext.getMergedExpandedQuery())); + final RefreshMode oldRefreshMode = oldTable.getRefreshMode(); + final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode(); + if (oldRefreshMode != newRefreshMode && newRefreshMode != null) { + throw new ValidationException("Changing of REFRESH MODE is unsupported"); + } + + return changes; + }; + } - final Map oldOptions = oldTable.getOptions(); - final Map newOptions = mergeContext.getMergedTableOptions(); + private List getDistributionTableChanges( + final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) { + final TableDistribution oldDistribution = oldTable.getDistribution().orElse(null); + final TableDistribution newDistribution = + mergeContext.getMergedTableDistribution().orElse(null); + if (!Objects.equals(oldDistribution, newDistribution)) { + if (oldDistribution == null) { + return List.of(TableChange.add(newDistribution)); + } else if (newDistribution == null) { + return List.of(TableChange.dropDistribution()); + } else { + return List.of(TableChange.modify(newDistribution)); + } + } + return List.of(); + } + + private List getOptionsTableChanges( + final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) { + final List changes = new ArrayList<>(); + final Map oldOptions = oldTable.getOptions(); + final Map newOptions = mergeContext.getMergedTableOptions(); - for (Map.Entry newOptionEntry : newOptions.entrySet()) { + for (Map.Entry newOptionEntry : newOptions.entrySet()) { + if (!newOptionEntry.getValue().equals(oldOptions.get(newOptionEntry.getKey()))) { changes.add(TableChange.set(newOptionEntry.getKey(), newOptionEntry.getValue())); } + } - for (Map.Entry oldOptionEntry : oldOptions.entrySet()) { - if (newOptions.get(oldOptionEntry.getKey()) == null) { - changes.add(TableChange.reset(oldOptionEntry.getKey())); - } + for (Map.Entry oldOptionEntry : oldOptions.entrySet()) { + if (newOptions.get(oldOptionEntry.getKey()) == null) { + changes.add(TableChange.reset(oldOptionEntry.getKey())); } + } + return changes; + } - final RefreshMode oldRefreshMode = oldTable.getRefreshMode(); - final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode(); - if (oldRefreshMode != newRefreshMode && newRefreshMode != null) { - throw new ValidationException("Changing of REFRESH MODE is unsupported"); + private List getQueryTableChanges( + final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) { + final String originalQuery = oldTable.getOriginalQuery(); + final String expandedQuery = oldTable.getExpandedQuery(); + if (!originalQuery.equals(mergeContext.getMergedOriginalQuery()) + || !expandedQuery.equals(mergeContext.getMergedExpandedQuery())) { + return List.of( + TableChange.modifyDefinitionQuery( + mergeContext.getMergedOriginalQuery(), + mergeContext.getMergedExpandedQuery())); + } + return List.of(); + } + + private List getSchemaTableChanges( + final MergeContext mergeContext, + final SchemaResolver schemaResolver, + final ResolvedCatalogMaterializedTable oldTable) { + final ResolvedSchema oldSchema = oldTable.getResolvedSchema(); + final ResolvedSchema newSchema = schemaResolver.resolve(mergeContext.getMergedSchema()); + final List changes = + new ArrayList<>( + MaterializedTableUtils.validateAndExtractColumnChanges( + oldSchema, newSchema, mergeContext.hasSchemaDefinition())); + + final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null); + final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null); + if (!Objects.equals(oldConstraint, newConstraint)) { + if (newConstraint == null) { + changes.add(TableChange.dropConstraint(oldConstraint.getName())); + } else if (oldConstraint == null) { + changes.add(TableChange.add(newConstraint)); + } else { + changes.add(TableChange.modify(newConstraint)); } + } - final TableDistribution oldDistribution = oldTable.getDistribution().orElse(null); - final TableDistribution newDistribution = - mergeContext.getMergedTableDistribution().orElse(null); - if (!Objects.equals(oldDistribution, newDistribution)) { - if (oldDistribution == null) { - changes.add(TableChange.add(newDistribution)); - } else if (newDistribution == null) { - changes.add(TableChange.dropDistribution()); - } else { - changes.add(TableChange.modify(newDistribution)); - } + final WatermarkSpec oldWatermarkSpec = + oldSchema.getWatermarkSpecs().isEmpty() + ? null + : oldSchema.getWatermarkSpecs().get(0); + final WatermarkSpec newWatermarkSpec = + newSchema.getWatermarkSpecs().isEmpty() + ? null + : newSchema.getWatermarkSpecs().get(0); + if (!Objects.equals(oldWatermarkSpec, newWatermarkSpec)) { + if (newWatermarkSpec == null) { + changes.add(TableChange.dropWatermark()); + } else if (oldWatermarkSpec == null) { + changes.add(TableChange.add(newWatermarkSpec)); + } else { + changes.add(TableChange.modify(newWatermarkSpec)); } + } - return changes; - }; + return changes; } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index 7d3b28981244e..8cc5ff524e806 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -38,12 +38,14 @@ import org.apache.flink.table.catalog.TableChange.ColumnPosition; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext; +import org.apache.flink.table.types.DataType; import org.apache.calcite.sql.SqlIntervalLiteral; import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.HashMap; @@ -278,7 +280,7 @@ private static void applyMetadataColumnChanges( } } - public static List validateAndExtractNewColumns( + public static List validateAndExtractColumnChanges( ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean schemaDefinedInQuery) { final List newColumns = getPersistedColumns(newSchema); final List oldColumns = getPersistedColumns(oldSchema); @@ -294,29 +296,54 @@ public static List validateAndExtractNewColumns( originalColumnSize, newColumnSize)); } + final List columnChanges = new ArrayList<>(); for (int i = 0; i < oldColumns.size(); i++) { - Column oldColumn = oldColumns.get(i); - Column newColumn = newColumns.get(i); + final Column oldColumn = oldColumns.get(i); + final Column newColumn = newColumns.get(i); + final DataType newColumnDataType = + getNewColumnDatatype(oldColumn, newColumns.get(i), schemaDefinedInQuery); if (!oldColumn.equals(newColumn)) { - throw new ValidationException( - String.format( - "When modifying the query of a materialized table, " - + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", - i, oldColumn, newColumn)); + if (!oldColumn.getName().equals(newColumn.getName()) + || !oldColumn.getDataType().equals(newColumnDataType)) { + throw new ValidationException( + String.format( + "When modifying the query of a materialized table, " + + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", + i + 1, oldColumn, newColumn)); + } + final String oldComment = oldColumn.getComment().orElse(null); + final String newComment = newColumn.getComment().orElse(null); + + if (StringUtils.isEmpty(oldComment) != StringUtils.isEmpty(newComment) + || StringUtils.isNotEmpty(oldComment) + && !Objects.equals(oldComment, newComment)) { + columnChanges.add(TableChange.modifyColumnComment(oldColumn, newComment)); + } } } - final List newAddedColumns = new ArrayList<>(); for (int i = oldColumns.size(); i < newColumns.size(); i++) { Column newColumn = newColumns.get(i); - newAddedColumns.add( - schemaDefinedInQuery - ? newColumn - : newColumn.copy(newColumn.getDataType().nullable())); + columnChanges.add( + TableChange.add( + schemaDefinedInQuery + ? newColumn + : newColumn.copy(newColumn.getDataType().nullable()))); } - return newAddedColumns; + return columnChanges; + } + + private static DataType getNewColumnDatatype( + Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) { + if (schemaDefinedInQuery) { + return newColumn.getDataType(); + } + if (oldColumn.getDataType().nullable().equals(newColumn.getDataType().nullable())) { + return oldColumn.getDataType(); + } + return newColumn.getDataType(); } public static ResolvedSchema getQueryOperationResolvedSchema( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 6f246c2fbb1c7..1782480b8c972 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -37,8 +37,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; -import org.apache.flink.table.catalog.TableDistribution; -import org.apache.flink.table.catalog.TableDistribution.Kind; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -52,7 +50,6 @@ import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; -import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; import org.apache.flink.table.planner.utils.TableFunc0; import org.junit.jupiter.api.BeforeEach; @@ -545,7 +542,8 @@ void testAlterMaterializedTableAsQuery() throws TableNotExistException { + "FROM `builtin`.`default`.`t3` AS `t3`")); assertThat(operation.asSummaryString()) .isEqualTo( - "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS " + + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + "FROM `builtin`.`default`.`t3` AS `t3`"); // new table only difference schema & definition query with old table. @@ -598,48 +596,6 @@ void testAlterMaterializedTableAsQueryWithConflictColumnName() { + "FROM `builtin`.`default`.`t3` AS `t3`")); } - @Test - void testAlterMaterializedTableAsQueryWithDefinedSchema() { - String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (" - + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` STRING, `a1` BIGINT NOT NULL, `f` INT) " - + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t3"; - FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = - (FullAlterMaterializedTableOperation) parse(sql); - - assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) - .containsExactly( - // If NOT NULL is defined in schema, it should stay - TableChange.add(Column.physical("a1", DataTypes.BIGINT().notNull())), - TableChange.add(Column.physical("f", DataTypes.INT())), - TableChange.modifyDefinitionQuery( - "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS `f`\nFROM `t3`", - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`a` AS `a1`, 3 AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"), - TableChange.reset("connector"), - TableChange.reset("format")); - } - - @Test - void testAlterMaterializedTableAsQueryWithoutDefinedSchema() { - String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl " - + "AS SELECT a, b, c, d, a as `a1` FROM t3"; - FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = - (FullAlterMaterializedTableOperation) parse(sql); - - assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) - .containsExactly( - // No explicit schema, so nullable will be used - TableChange.add(Column.physical("a1", DataTypes.BIGINT())), - TableChange.modifyDefinitionQuery( - "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM `t3`", - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`a` AS `a1`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"), - TableChange.reset("connector"), - TableChange.reset("format")); - } - @Test void testDropMaterializedTable() { final String sql = "DROP MATERIALIZED TABLE mtbl1"; @@ -693,161 +649,6 @@ void testCreateOrAlterMaterializedTable() { assertThat(materializedTable.getOrigin()).isEqualTo(expected); } - private static Collection createOrAlterForExistingMaterializedTableFailedCaseSpecs() { - return List.of( - TestSpec.of( - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n" - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "PARTITIONED BY (a, d)\n" - + "WITH (\n" - + " 'connector' = 'filesystem', \n" - + " 'format' = 'json'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = CONTINUOUS\n" - + "AS SELECT * FROM t1", - "Changing of REFRESH MODE is unsupported"), - TestSpec.of( - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n" - + " a BIGINT, b INT, c INT, d INT, " - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "PARTITIONED BY (a, d)\n" - + "WITH (\n" - + " 'connector' = 'filesystem', \n" - + " 'format' = 'json'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT * FROM t1", - "Incompatible types for sink column 'b' at position 2. " - + "The source column has type 'STRING', while the target column has type 'INT'.")); - } - - @Test - void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistException { - final String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n" - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n" - + "PARTITIONED BY (a, d)\n" - + "WITH (\n" - + " 'format' = 'json2'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t3"; - Operation operation = parse(sql); - - assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); - - FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; - assertThat(op.getTableChanges()) - .containsExactly( - TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.modifyDefinitionQuery( - "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, CAST('123' AS STRING) AS `f`\nFROM `t3`", - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"), - TableChange.set("format", "json2"), - TableChange.reset("connector"), - TableChange.add(TableDistribution.of(Kind.HASH, 7, List.of("b")))); - assertThat(operation.asSummaryString()) - .isEqualTo( - "CREATE OR ALTER MATERIALIZED TABLE builtin.default.base_mtbl\n" - + " ADD `e` STRING ,\n" - + " ADD `f` STRING ,\n" - + " MODIFY DEFINITION QUERY TO 'SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`',\n" - + " SET 'format' = 'json2',\n" - + " RESET 'connector',\n" - + " ADD DISTRIBUTED BY HASH(`b`) INTO 7 BUCKETS"); - - // new table only difference schema & definition query with old table. - CatalogMaterializedTable oldTable = - (CatalogMaterializedTable) - catalog.getTable( - new ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl")); - CatalogMaterializedTable newTable = op.getNewTable(); - - assertThat(newTable.getOptions()).containsExactly(Map.entry("format", "json2")); - assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); - assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) - .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); - assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) - .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); - assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery()); - assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery()); - assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); - assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); - assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); - assertThat(oldTable.getSerializedRefreshHandler()) - .isEqualTo(newTable.getSerializedRefreshHandler()); - - List addedColumn = - newTable.getUnresolvedSchema().getColumns().stream() - .filter( - column -> - !oldTable.getUnresolvedSchema() - .getColumns() - .contains(column)) - .collect(Collectors.toList()); - // added column should be a nullable column. - assertThat(addedColumn) - .containsExactly( - new UnresolvedPhysicalColumn("e", DataTypes.VARCHAR(Integer.MAX_VALUE)), - new UnresolvedPhysicalColumn("f", DataTypes.VARCHAR(Integer.MAX_VALUE))); - } - - @Test - void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() { - final String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_metadata (\n" - + " t AS current_timestamp," - + " m STRING METADATA VIRTUAL," - + " m_p STRING METADATA," - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED," - + " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n" - + "WITH (\n" - + " 'connector' = 'filesystem', \n" - + " 'format' = 'json'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT t1.* FROM t1"; - Operation operation = parse(sql); - - assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); - - FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; - assertThat(op.getTableChanges()) - .containsExactly( - TableChange.modifyDefinitionQuery( - "SELECT `t1`.*\n" + "FROM `t1`", - "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`"), - TableChange.set("connector", "filesystem"), - TableChange.set("format", "json"), - TableChange.modify(TableDistribution.of(Kind.HASH, 5, List.of("a")))); - assertThat(operation.asSummaryString()) - .isEqualTo( - "CREATE OR ALTER MATERIALIZED TABLE builtin.default.base_mtbl_with_metadata\n" - + " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`',\n" - + " SET 'connector' = 'filesystem',\n" - + " SET 'format' = 'json',\n" - + " MODIFY DISTRIBUTED BY HASH(`a`) INTO 5 BUCKETS"); - } - private static Collection testDataForCreateAlterMaterializedTableFailedCase() { final Collection list = new ArrayList<>(); list.addAll(createWithInvalidSchema()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java new file mode 100644 index 0000000000000..95e7d1714cbe7 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest + extends SqlNodeToOperationConversionTestBase { + private static final String DEFAULT_MATERIALIZED_TABLE = + "CREATE MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + + @BeforeEach + void before() throws TableAlreadyExistException, DatabaseNotExistException { + super.before(); + createMaterializedTableInCatalog(DEFAULT_MATERIALIZED_TABLE, "mt"); + } + + @Test + void testAlterMaterializedTableAsQueryWithDefinedSchema() { + String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (" + + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` STRING, `a1` BIGINT NOT NULL, `f` INT) " + + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t1"; + FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = + (FullAlterMaterializedTableOperation) parse(sql); + + assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) + .containsExactly( + // If NOT NULL is defined in schema, it should stay + TableChange.add(Column.physical("a1", DataTypes.BIGINT().notNull())), + TableChange.add(Column.physical("f", DataTypes.INT())), + TableChange.dropConstraint("ct1"), + TableChange.modifyDefinitionQuery( + "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS `f`\nFROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.reset("connector"), + TableChange.reset("format")); + } + + @Test + void testAlterMaterializedTableAsQueryWithoutDefinedSchema() { + String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt " + + "AS SELECT a, b, c, d, a as `a1` FROM t1"; + FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = + (FullAlterMaterializedTableOperation) parse(sql); + + assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) + .containsExactly( + // No explicit schema, so nullable will be used + TableChange.add(Column.physical("a1", DataTypes.BIGINT())), + TableChange.dropConstraint("ct1"), + TableChange.modifyDefinitionQuery( + "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`a` AS `a1`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.reset("connector"), + TableChange.reset("format")); + } + + @ParameterizedTest + @MethodSource("createOrAlterForExistingMaterializedTableFailedCaseSpecs") + void createOrAlterForExistingMaterializedTableFailedCase(TestSpec spec) { + Operation operation = parse(spec.sql); + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + // Will be invoked while operation#execute + assertThatThrownBy(op::getTableChanges) + .isInstanceOf(spec.expectedException) + .hasMessage(spec.errMessage); + } + + private static Collection createOrAlterForExistingMaterializedTableFailedCaseSpecs() { + return List.of( + TestSpec.of( + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT * FROM t1", + "Changing of REFRESH MODE is unsupported"), + TestSpec.of( + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " a BIGINT, b INT, c INT, d INT, " + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "AS SELECT * FROM t1", + "Incompatible types for sink column 'b' at position 2. " + + "The source column has type 'STRING', while the target column has type 'INT'.")); + } + + @Test + void testCreateOrAlterMaterializedTableForExistingTableNoChanges() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()).isEmpty(); + assertThat(operation.asSummaryString()) + .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n"); + } + + @Test + void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistException { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'format' = 'json2'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, CAST('123' AS STRING) AS `f`\nFROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.set("format", "json2"), + TableChange.reset("connector"), + TableChange.add( + TableDistribution.of( + TableDistribution.Kind.HASH, 7, List.of("b")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n" + + " ADD `e` STRING ,\n" + + " ADD `f` STRING ,\n" + + " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`',\n" + + " SET 'format' = 'json2',\n" + + " RESET 'connector',\n" + + " ADD DISTRIBUTED BY HASH(`b`) INTO 7 BUCKETS"); + + // new table only difference schema & definition query with old table. + CatalogMaterializedTable oldTable = + (CatalogMaterializedTable) + catalog.getTable(new ObjectPath(catalogManager.getCurrentDatabase(), "mt")); + CatalogMaterializedTable newTable = op.getNewTable(); + + assertThat(newTable.getOptions()).containsExactly(Map.entry("format", "json2")); + assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); + assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) + .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); + assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) + .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery()); + assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); + assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + + List addedColumn = + newTable.getUnresolvedSchema().getColumns().stream() + .filter( + column -> + !oldTable.getUnresolvedSchema() + .getColumns() + .contains(column)) + .collect(Collectors.toList()); + // added column should be a nullable column. + assertThat(addedColumn) + .containsExactly( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE))); + } + + @Test + void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt2\n" + + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n" + + "AS SELECT t1.* FROM t1"; + createMaterializedTableInCatalog(prepSql, "mt2"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt2\n" + + "DISTRIBUTED BY HASH (b) INTO 4 BUCKETS\n" + + "AS SELECT t1.* FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.modify( + TableDistribution.of( + TableDistribution.Kind.HASH, 4, List.of("b")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt2\n" + + " MODIFY DISTRIBUTED BY HASH(`b`) INTO 4 BUCKETS"); + } + + @Test + void testCreateOrAlterMaterializedTableWithDroppedConstraint() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt \n" + + "COMMENT 'New materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()).containsExactly(TableChange.dropConstraint("ct1")); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n" + + " DROP CONSTRAINT ct1"); + } + + @Test + void testCreateOrAlterMaterializedTableWithChangedConstraint() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT new_constraint PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'New materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.modify( + UniqueConstraint.primaryKey("new_constraint", List.of("a")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n" + + " MODIFY CONSTRAINT `new_constraint` PRIMARY KEY (`a`) NOT ENFORCED"); + } + + @Test + void testCreateOrAlterMaterializedTableWithNewConstraint() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT NOT NULL, t TIMESTAMP_LTZ(3)\n" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n" + + " id INT NOT NULL, t TIMESTAMP_LTZ(3),\n" + + " CONSTRAINT new_constraint PRIMARY KEY(id) NOT ENFORCED" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.add( + UniqueConstraint.primaryKey("new_constraint", List.of("id")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " ADD CONSTRAINT `new_constraint` PRIMARY KEY (`id`) NOT ENFORCED"); + } + + @Test + void testCreateOrAlterMaterializedTableWithAddedWatermark() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3)" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3),\n" + + " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " ADD WATERMARK FOR `t`: TIMESTAMP_LTZ(3) NOT NULL AS CURRENT_TIMESTAMP - INTERVAL '5' SECOND"); + } + + @Test + void testCreateOrAlterMaterializedTableWithModifiedWatermark() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3),\n" + + " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND\n" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3),\n" + + " WATERMARK FOR t as current_timestamp - INTERVAL '12' HOUR\n" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " MODIFY WATERMARK FOR `t`: TIMESTAMP_LTZ(3) NOT NULL AS CURRENT_TIMESTAMP - INTERVAL '12' HOUR"); + } + + @Test + void testCreateOrAlterMaterializedTableWithDroppedWatermark() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3),\n" + + " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3)" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " DROP WATERMARK"); + } + + @Test + void testCreateOrAlterMaterializedTableWithDroppedWatermarkByNoExplicitSchema() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3),\n" + + " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " DROP WATERMARK"); + } + + @Test + void testCreateOrAlterMaterializedTableWithCommentChange() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT COMMENT 'INT comment', t TIMESTAMP_LTZ(3)\n" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n" + + " id INT, t TIMESTAMP_LTZ(3) COMMENT 'Timestamp Comment'\n" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.modifyColumnComment( + Column.physical("id", DataTypes.INT()).withComment("INT comment"), + null), + TableChange.modifyColumnComment( + Column.physical("t", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + "Timestamp Comment")); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " MODIFY `id` COMMENT '',\n" + + " MODIFY `t` COMMENT 'Timestamp Comment'"); + } + + private void createMaterializedTableInCatalog(String sql, String materializedTableName) + throws TableAlreadyExistException, DatabaseNotExistException { + final ObjectPath objectPath = + new ObjectPath(catalogManager.getCurrentDatabase(), materializedTableName); + final CreateMaterializedTableOperation operation = createMaterializedTableOperation(sql); + catalog.createTable(objectPath, operation.getCatalogMaterializedTable(), true); + } + + private CreateMaterializedTableOperation createMaterializedTableOperation(String sql) { + final Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + return (CreateMaterializedTableOperation) operation; + } + + private static class TestSpec { + private final String sql; + private final Class expectedException; + private final String errMessage; + private final String expectedSchema; + + private TestSpec(String sql, Class expectedException, String errMessage) { + this.sql = sql; + this.expectedException = expectedException; + this.errMessage = errMessage; + this.expectedSchema = null; + } + + private TestSpec(String sql, String expectedSchema) { + this.sql = sql; + this.expectedException = null; + this.errMessage = null; + this.expectedSchema = expectedSchema; + } + + public static TestSpec of(String sql, Class expectedException, String errMessage) { + return new TestSpec(sql, expectedException, errMessage); + } + + public static TestSpec of(String sql, String errMessage) { + return of(sql, ValidationException.class, errMessage); + } + + public static TestSpec withExpectedSchema(String sql, String expectedSchema) { + return new TestSpec(sql, expectedSchema); + } + + @Override + public String toString() { + return sql; + } + } +}