diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index e9d66c7a38c1..f30812b7bb45 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -27,16 +27,16 @@ -
aggregation.remove-record-on-delete
+
add-column-before-partition
false Boolean - Whether to remove the whole row in aggregation engine when -D records are received. + If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables. -
add-column-before-partition
+
aggregation.remove-record-on-delete
false Boolean - If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables. + Whether to remove the whole row in aggregation engine when -D records are received.
alter-column-null-to-not-null.disabled
@@ -1043,6 +1043,12 @@

Enum

This is only for partitioned append table or postpone pk table, and the purpose is to reduce small files and improve write performance. Through this repartitioning strategy to reduce the number of partitions written by each task to as few as possible.

Possible values: + +
partition.timestamp-format.strict
+ false + Boolean + When enabled, if a partition value does not match the 'partition.timestamp-formatter' or 'partition.timestamp-pattern' configuration, an error will be thrown during writing. This helps prevent dirty partition directories caused by incorrectly specified partition fields. +
partition.timestamp-formatter
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d85cf9ae3757..f0015d60fa40 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1189,6 +1189,16 @@ public InlineElement getDescription() { + "$hour:00:00'.")) .build()); + public static final ConfigOption PARTITION_TIMESTAMP_FORMAT_STRICT = + key("partition.timestamp-format.strict") + .booleanType() + .defaultValue(false) + .withDescription( + "When enabled, if a partition value does not match the " + + "'partition.timestamp-formatter' or 'partition.timestamp-pattern' configuration, " + + "an error will be thrown during writing. " + + "This helps prevent dirty partition directories caused by incorrectly specified partition fields."); + public static final ConfigOption PARTITION_MARK_DONE_WHEN_END_INPUT = ConfigOptions.key("partition.end-input-to-done") .booleanType() @@ -3241,6 +3251,10 @@ public String partitionTimestampPattern() { return options.get(PARTITION_TIMESTAMP_PATTERN); } + public boolean partitionTimestampFormatStrict() { + return options.get(PARTITION_TIMESTAMP_FORMAT_STRICT); + } + public String httpReportMarkDoneActionUrl() { return options.get(PARTITION_MARK_DONE_ACTION_URL); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index f0e836a9ac75..a49fff06d0ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -35,12 +35,14 @@ import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.CompactionMetrics; +import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.RecordWriter; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -48,7 +50,9 @@ import javax.annotation.Nullable; +import java.time.format.DateTimeParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -57,6 +61,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; @@ -78,6 +84,8 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { private final int numBuckets; private final RowType partitionType; + @Nullable private final PartitionTimestampValidator partitionTimestampValidator; + @Nullable protected IOManager ioManager; protected final Map>> writers; @@ -115,6 +123,8 @@ protected AbstractFileStoreWrite( this.tableName = tableName; this.writerNumberMax = options.writeMaxWritersToSpill(); this.legacyPartitionName = options.legacyPartitionName(); + this.partitionTimestampValidator = + PartitionTimestampValidator.create(options, partitionType); } @Override @@ -423,6 +433,10 @@ public WriterContainer createWriterContainer(BinaryRow partition, int bucket) LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket); } + if (partitionTimestampValidator != null) { + partitionTimestampValidator.validate(partition); + } + if (writerNumber() >= writerNumberMax) { try { forceBufferSpill(); @@ -579,4 +593,58 @@ public Map>> writers() { public CompactionMetrics compactionMetrics() { return compactionMetrics; } + + private static class PartitionTimestampValidator { + + private final PartitionTimeExtractor timeExtractor; + private final RowDataToObjectArrayConverter partitionConverter; + private final List partitionKeys; + + private PartitionTimestampValidator( + PartitionTimeExtractor timeExtractor, + RowDataToObjectArrayConverter partitionConverter, + List partitionKeys) { + this.timeExtractor = timeExtractor; + this.partitionConverter = partitionConverter; + this.partitionKeys = partitionKeys; + } + + @Nullable + private static PartitionTimestampValidator create( + CoreOptions options, RowType partitionType) { + if (!options.partitionTimestampFormatStrict()) { + return null; + } + String timeFormatter = options.partitionTimestampFormatter(); + String timePattern = options.partitionTimestampPattern(); + if ((timeFormatter != null || timePattern != null) + && partitionType.getFieldCount() > 0) { + return new PartitionTimestampValidator( + new PartitionTimeExtractor(timePattern, timeFormatter), + new RowDataToObjectArrayConverter(partitionType), + partitionType.getFieldNames()); + } + return null; + } + + private void validate(BinaryRow partition) { + Object[] array = partitionConverter.convert(partition); + try { + timeExtractor.extract(partitionKeys, Arrays.asList(array)); + } catch (DateTimeParseException e) { + String partitionInfo = + IntStream.range(0, partitionKeys.size()) + .mapToObj(i -> partitionKeys.get(i) + "=" + array[i]) + .collect(Collectors.joining(", ")); + throw new IllegalArgumentException( + String.format( + "Partition %s does not match the 'partition.timestamp-formatter' or " + + "'partition.timestamp-pattern' configuration. " + + "This might be caused by an incorrectly specified partition field. " + + "Please check your partition configuration.", + partitionInfo), + e); + } + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java index e49e802e0b4b..659c010d529a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java @@ -62,6 +62,7 @@ import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link TableWriteImpl}. */ public class TableWriteTest { @@ -297,6 +298,24 @@ public void testWaitAllSnapshotsOfSpecificIdentifier() throws Exception { commit.close(); } + @Test + public void testPartitionTimestampValidation() throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.BUCKET, 1); + conf.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd"); + conf.set(CoreOptions.PARTITION_TIMESTAMP_FORMAT_STRICT, true); + FileStoreTable table = createFileStoreTable(conf); + + TableWriteImpl write = table.newWrite(commitUser); + // valid partition value: 20260316 matches yyyyMMdd + write.write(GenericRow.of(20260316, 1, 1L)); + // invalid partition value: -100 does not match yyyyMMdd + assertThatThrownBy(() -> write.write(GenericRow.of(-100, 1, 1L))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("partition.timestamp-formatter"); + write.close(); + } + private BinaryRow partition(int x) { BinaryRow partition = new BinaryRow(1); BinaryRowWriter writer = new BinaryRowWriter(partition);