Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
</thead>
<tbody>
<tr>
<td><h5>aggregation.remove-record-on-delete</h5></td>
<td><h5>add-column-before-partition</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
</tr>
<tr>
<td><h5>add-column-before-partition</h5></td>
<td><h5>aggregation.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
</tr>
<tr>
<td><h5>alter-column-null-to-not-null.disabled</h5></td>
Expand Down Expand Up @@ -1043,6 +1043,12 @@
<td><p>Enum</p></td>
<td>This is only for partitioned append table or postpone pk table, and the purpose is to reduce small files and improve write performance. Through this repartitioning strategy to reduce the number of partitions written by each task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning, this is the default behavior, this strategy is suitable for the number of partitions you write in a batch is much smaller than write parallelism.</li><li>hash: Hash the partitions value, this strategy is suitable for the number of partitions you write in a batch is greater equals than write parallelism.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"HASH"</li></ul></td>
</tr>
<tr>
<td><h5>partition.timestamp-format.strict</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When enabled, if a partition value does not match the 'partition.timestamp-formatter' or 'partition.timestamp-pattern' configuration, an error will be thrown during writing. This helps prevent dirty partition directories caused by incorrectly specified partition fields.</td>
</tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,16 @@ public InlineElement getDescription() {
+ "$hour:00:00'."))
.build());

public static final ConfigOption<Boolean> PARTITION_TIMESTAMP_FORMAT_STRICT =
key("partition.timestamp-format.strict")
.booleanType()
.defaultValue(false)
.withDescription(
"When enabled, if a partition value does not match the "
+ "'partition.timestamp-formatter' or 'partition.timestamp-pattern' configuration, "
+ "an error will be thrown during writing. "
+ "This helps prevent dirty partition directories caused by incorrectly specified partition fields.");

public static final ConfigOption<Boolean> PARTITION_MARK_DONE_WHEN_END_INPUT =
ConfigOptions.key("partition.end-input-to-done")
.booleanType()
Expand Down Expand Up @@ -3241,6 +3251,10 @@ public String partitionTimestampPattern() {
return options.get(PARTITION_TIMESTAMP_PATTERN);
}

public boolean partitionTimestampFormatStrict() {
return options.get(PARTITION_TIMESTAMP_FORMAT_STRICT);
}

public String httpReportMarkDoneActionUrl() {
return options.get(PARTITION_MARK_DONE_ACTION_URL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,24 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -57,6 +61,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
Expand All @@ -78,6 +84,8 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private final int numBuckets;
private final RowType partitionType;

@Nullable private final PartitionTimestampValidator partitionTimestampValidator;

@Nullable protected IOManager ioManager;

protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
Expand Down Expand Up @@ -115,6 +123,8 @@ protected AbstractFileStoreWrite(
this.tableName = tableName;
this.writerNumberMax = options.writeMaxWritersToSpill();
this.legacyPartitionName = options.legacyPartitionName();
this.partitionTimestampValidator =
PartitionTimestampValidator.create(options, partitionType);
}

@Override
Expand Down Expand Up @@ -423,6 +433,10 @@ public WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket)
LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
}

if (partitionTimestampValidator != null) {
partitionTimestampValidator.validate(partition);
}

if (writerNumber() >= writerNumberMax) {
try {
forceBufferSpill();
Expand Down Expand Up @@ -579,4 +593,58 @@ public Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
public CompactionMetrics compactionMetrics() {
return compactionMetrics;
}

private static class PartitionTimestampValidator {

private final PartitionTimeExtractor timeExtractor;
private final RowDataToObjectArrayConverter partitionConverter;
private final List<String> partitionKeys;

private PartitionTimestampValidator(
PartitionTimeExtractor timeExtractor,
RowDataToObjectArrayConverter partitionConverter,
List<String> partitionKeys) {
this.timeExtractor = timeExtractor;
this.partitionConverter = partitionConverter;
this.partitionKeys = partitionKeys;
}

@Nullable
private static PartitionTimestampValidator create(
CoreOptions options, RowType partitionType) {
if (!options.partitionTimestampFormatStrict()) {
return null;
}
String timeFormatter = options.partitionTimestampFormatter();
String timePattern = options.partitionTimestampPattern();
if ((timeFormatter != null || timePattern != null)
&& partitionType.getFieldCount() > 0) {
return new PartitionTimestampValidator(
new PartitionTimeExtractor(timePattern, timeFormatter),
new RowDataToObjectArrayConverter(partitionType),
partitionType.getFieldNames());
}
return null;
}

private void validate(BinaryRow partition) {
Object[] array = partitionConverter.convert(partition);
try {
timeExtractor.extract(partitionKeys, Arrays.asList(array));
} catch (DateTimeParseException e) {
String partitionInfo =
IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + "=" + array[i])
.collect(Collectors.joining(", "));
throw new IllegalArgumentException(
String.format(
"Partition %s does not match the 'partition.timestamp-formatter' or "
+ "'partition.timestamp-pattern' configuration. "
+ "This might be caused by an incorrectly specified partition field. "
+ "Please check your partition configuration.",
partitionInfo),
e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link TableWriteImpl}. */
public class TableWriteTest {
Expand Down Expand Up @@ -297,6 +298,24 @@ public void testWaitAllSnapshotsOfSpecificIdentifier() throws Exception {
commit.close();
}

@Test
public void testPartitionTimestampValidation() throws Exception {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, 1);
conf.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd");
conf.set(CoreOptions.PARTITION_TIMESTAMP_FORMAT_STRICT, true);
FileStoreTable table = createFileStoreTable(conf);

TableWriteImpl<?> write = table.newWrite(commitUser);
// valid partition value: 20260316 matches yyyyMMdd
write.write(GenericRow.of(20260316, 1, 1L));
// invalid partition value: -100 does not match yyyyMMdd
assertThatThrownBy(() -> write.write(GenericRow.of(-100, 1, 1L)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("partition.timestamp-formatter");
write.close();
}

private BinaryRow partition(int x) {
BinaryRow partition = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(partition);
Expand Down
Loading