Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public static String toString(TableChange tableChange) {
return String.format(
" MODIFY %s COMMENT '%s'",
EncodingUtils.escapeIdentifier(modifyColumnComment.getNewColumn().getName()),
modifyColumnComment.getNewComment());
modifyColumnComment.getNewComment() == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2-3 commits before, we already had some changes to the comment logic. Can you double-check if the commits are correctly cut?

? ""
: modifyColumnComment.getNewComment());
Comment on lines +100 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we avoid MODIFY COMMENT if the comment is null, instead of returning:

MODIFY 'id' COMMENT ''

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this type modification means it is removed
since there is no a dedicated table change for that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it thanks!

} else if (tableChange instanceof TableChange.ModifyPhysicalColumnType) {
TableChange.ModifyPhysicalColumnType modifyPhysicalColumnType =
(TableChange.ModifyPhysicalColumnType) tableChange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
Expand Down Expand Up @@ -123,55 +124,116 @@ private Operation handleCreate(
private Function<ResolvedCatalogMaterializedTable, List<TableChange>> buildTableChanges(
final MergeContext mergeContext, final SchemaResolver schemaResolver) {
return oldTable -> {
final List<TableChange> changes = new ArrayList<>();
final List<TableChange> changes =
getSchemaTableChanges(mergeContext, schemaResolver, oldTable);

final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
final List<Column> newColumns =
MaterializedTableUtils.validateAndExtractNewColumns(
oldSchema,
schemaResolver.resolve(mergeContext.getMergedSchema()),
mergeContext.hasSchemaDefinition());
changes.addAll(getQueryTableChanges(mergeContext, oldTable));
changes.addAll(getOptionsTableChanges(mergeContext, oldTable));
changes.addAll(getDistributionTableChanges(mergeContext, oldTable));

newColumns.forEach(column -> changes.add(TableChange.add(column)));
changes.add(
TableChange.modifyDefinitionQuery(
mergeContext.getMergedOriginalQuery(),
mergeContext.getMergedExpandedQuery()));
final RefreshMode oldRefreshMode = oldTable.getRefreshMode();
final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode();
if (oldRefreshMode != newRefreshMode && newRefreshMode != null) {
throw new ValidationException("Changing of REFRESH MODE is unsupported");
}

return changes;
};
}

final Map<String, String> oldOptions = oldTable.getOptions();
final Map<String, String> newOptions = mergeContext.getMergedTableOptions();
private List<TableChange> getDistributionTableChanges(
final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) {
final TableDistribution oldDistribution = oldTable.getDistribution().orElse(null);
final TableDistribution newDistribution =
mergeContext.getMergedTableDistribution().orElse(null);
if (!Objects.equals(oldDistribution, newDistribution)) {
if (oldDistribution == null) {
return List.of(TableChange.add(newDistribution));
} else if (newDistribution == null) {
return List.of(TableChange.dropDistribution());
} else {
return List.of(TableChange.modify(newDistribution));
}
}
return List.of();
}

private List<TableChange> getOptionsTableChanges(
final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) {
final List<TableChange> changes = new ArrayList<>();
final Map<String, String> oldOptions = oldTable.getOptions();
final Map<String, String> newOptions = mergeContext.getMergedTableOptions();

for (Map.Entry<String, String> newOptionEntry : newOptions.entrySet()) {
for (Map.Entry<String, String> newOptionEntry : newOptions.entrySet()) {
if (!newOptionEntry.getValue().equals(oldOptions.get(newOptionEntry.getKey()))) {
changes.add(TableChange.set(newOptionEntry.getKey(), newOptionEntry.getValue()));
}
}

for (Map.Entry<String, String> oldOptionEntry : oldOptions.entrySet()) {
if (newOptions.get(oldOptionEntry.getKey()) == null) {
changes.add(TableChange.reset(oldOptionEntry.getKey()));
}
for (Map.Entry<String, String> oldOptionEntry : oldOptions.entrySet()) {
if (newOptions.get(oldOptionEntry.getKey()) == null) {
changes.add(TableChange.reset(oldOptionEntry.getKey()));
}
}
return changes;
}

final RefreshMode oldRefreshMode = oldTable.getRefreshMode();
final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode();
if (oldRefreshMode != newRefreshMode && newRefreshMode != null) {
throw new ValidationException("Changing of REFRESH MODE is unsupported");
private List<TableChange> getQueryTableChanges(
final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) {
final String originalQuery = oldTable.getOriginalQuery();
final String expandedQuery = oldTable.getExpandedQuery();
if (!originalQuery.equals(mergeContext.getMergedOriginalQuery())
|| !expandedQuery.equals(mergeContext.getMergedExpandedQuery())) {
return List.of(
TableChange.modifyDefinitionQuery(
mergeContext.getMergedOriginalQuery(),
mergeContext.getMergedExpandedQuery()));
}
return List.of();
}

private List<TableChange> getSchemaTableChanges(
final MergeContext mergeContext,
final SchemaResolver schemaResolver,
final ResolvedCatalogMaterializedTable oldTable) {
final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
final ResolvedSchema newSchema = schemaResolver.resolve(mergeContext.getMergedSchema());
final List<TableChange> changes =
new ArrayList<>(
MaterializedTableUtils.validateAndExtractColumnChanges(
oldSchema, newSchema, mergeContext.hasSchemaDefinition()));

final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null);
final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null);
if (!Objects.equals(oldConstraint, newConstraint)) {
if (newConstraint == null) {
changes.add(TableChange.dropConstraint(oldConstraint.getName()));
} else if (oldConstraint == null) {
changes.add(TableChange.add(newConstraint));
} else {
changes.add(TableChange.modify(newConstraint));
}
}

final TableDistribution oldDistribution = oldTable.getDistribution().orElse(null);
final TableDistribution newDistribution =
mergeContext.getMergedTableDistribution().orElse(null);
if (!Objects.equals(oldDistribution, newDistribution)) {
if (oldDistribution == null) {
changes.add(TableChange.add(newDistribution));
} else if (newDistribution == null) {
changes.add(TableChange.dropDistribution());
} else {
changes.add(TableChange.modify(newDistribution));
}
final WatermarkSpec oldWatermarkSpec =
oldSchema.getWatermarkSpecs().isEmpty()
? null
: oldSchema.getWatermarkSpecs().get(0);
final WatermarkSpec newWatermarkSpec =
newSchema.getWatermarkSpecs().isEmpty()
? null
: newSchema.getWatermarkSpecs().get(0);
if (!Objects.equals(oldWatermarkSpec, newWatermarkSpec)) {
if (newWatermarkSpec == null) {
changes.add(TableChange.dropWatermark());
} else if (oldWatermarkSpec == null) {
changes.add(TableChange.add(newWatermarkSpec));
} else {
changes.add(TableChange.modify(newWatermarkSpec));
}
}

return changes;
};
return changes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.catalog.TableChange.ColumnPosition;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;

import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
Expand Down Expand Up @@ -278,7 +279,7 @@ private static void applyMetadataColumnChanges(
}
}

public static List<Column> validateAndExtractNewColumns(
public static List<TableChange> validateAndExtractColumnChanges(
ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean schemaDefinedInQuery) {
final List<Column> newColumns = getPersistedColumns(newSchema);
final List<Column> oldColumns = getPersistedColumns(oldSchema);
Expand All @@ -294,29 +295,43 @@ public static List<Column> validateAndExtractNewColumns(
originalColumnSize, newColumnSize));
}

final List<TableChange> columnChanges = new ArrayList<>();
for (int i = 0; i < oldColumns.size(); i++) {
Column oldColumn = oldColumns.get(i);
Column newColumn = newColumns.get(i);
Column newColumn =
schemaDefinedInQuery
? newColumns.get(i)
: newColumns.get(i).copy(newColumns.get(i).getDataType().nullable());
if (!oldColumn.equals(newColumn)) {
throw new ValidationException(
String.format(
"When modifying the query of a materialized table, "
+ "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n"
+ "Column mismatch at position %d: Original column is [%s], but new column is [%s].",
i, oldColumn, newColumn));
if (!oldColumn.getName().equals(newColumn.getName())
|| !LogicalTypeCasts.supportsImplicitCast(
oldColumn.getDataType().getLogicalType(),
newColumn.getDataType().getLogicalType())) {
Comment on lines +306 to +309
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if we change we change the type of the column here? From CHAR(1) -> CHAR(4)

throw new ValidationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the exception also to the operation and just issue a TableChange.ModifyPhysicalColumnType?

String.format(
"When modifying the query of a materialized table, "
+ "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n"
+ "Column mismatch at position %d: Original column is [%s], but new column is [%s].",
i + 1, oldColumn, newColumn));
}
if (!Objects.equals(oldColumn.getComment(), newColumn.getComment())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we often treat comment = '' == comment = null, shall we normalize here? Wrap comments in some StringUtil.emptyIfNull.

columnChanges.add(
TableChange.modifyColumnComment(
oldColumn, newColumn.getComment().orElse(null)));
}
}
}

final List<Column> newAddedColumns = new ArrayList<>();
for (int i = oldColumns.size(); i < newColumns.size(); i++) {
Column newColumn = newColumns.get(i);
newAddedColumns.add(
schemaDefinedInQuery
? newColumn
: newColumn.copy(newColumn.getDataType().nullable()));
columnChanges.add(
TableChange.add(
schemaDefinedInQuery
? newColumn
: newColumn.copy(newColumn.getDataType().nullable())));
}

return newAddedColumns;
return columnChanges;
}

public static ResolvedSchema getQueryOperationResolvedSchema(
Expand Down
Loading