Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,24 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -57,6 +61,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
Expand All @@ -78,6 +84,8 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private final int numBuckets;
private final RowType partitionType;

@Nullable private final PartitionTimestampValidator partitionTimestampValidator;

@Nullable protected IOManager ioManager;

protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
Expand Down Expand Up @@ -115,6 +123,8 @@ protected AbstractFileStoreWrite(
this.tableName = tableName;
this.writerNumberMax = options.writeMaxWritersToSpill();
this.legacyPartitionName = options.legacyPartitionName();
this.partitionTimestampValidator =
PartitionTimestampValidator.create(options, partitionType);
}

@Override
Expand Down Expand Up @@ -423,6 +433,10 @@ public WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket)
LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
}

if (partitionTimestampValidator != null) {
partitionTimestampValidator.validate(partition);
}

if (writerNumber() >= writerNumberMax) {
try {
forceBufferSpill();
Expand Down Expand Up @@ -579,4 +593,55 @@ public Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
public CompactionMetrics compactionMetrics() {
return compactionMetrics;
}

private static class PartitionTimestampValidator {

private final PartitionTimeExtractor timeExtractor;
private final RowDataToObjectArrayConverter partitionConverter;
private final List<String> partitionKeys;

private PartitionTimestampValidator(
PartitionTimeExtractor timeExtractor,
RowDataToObjectArrayConverter partitionConverter,
List<String> partitionKeys) {
this.timeExtractor = timeExtractor;
this.partitionConverter = partitionConverter;
this.partitionKeys = partitionKeys;
}

@Nullable
private static PartitionTimestampValidator create(
CoreOptions options, RowType partitionType) {
String timeFormatter = options.partitionTimestampFormatter();
String timePattern = options.partitionTimestampPattern();
if ((timeFormatter != null || timePattern != null)
&& partitionType.getFieldCount() > 0) {
return new PartitionTimestampValidator(
new PartitionTimeExtractor(timePattern, timeFormatter),
new RowDataToObjectArrayConverter(partitionType),
partitionType.getFieldNames());
}
return null;
}

private void validate(BinaryRow partition) {
Object[] array = partitionConverter.convert(partition);
try {
timeExtractor.extract(partitionKeys, Arrays.asList(array));
} catch (DateTimeParseException e) {
String partitionInfo =
IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + "=" + array[i])
.collect(Collectors.joining(", "));
throw new IllegalArgumentException(
String.format(
"Partition %s does not match the 'partition.timestamp-formatter' or "
+ "'partition.timestamp-pattern' configuration. "
+ "This might be caused by an incorrectly specified partition field. "
+ "Please check your partition configuration.",
partitionInfo),
e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link TableWriteImpl}. */
public class TableWriteTest {
Expand Down Expand Up @@ -297,6 +298,23 @@ public void testWaitAllSnapshotsOfSpecificIdentifier() throws Exception {
commit.close();
}

@Test
public void testPartitionTimestampValidation() throws Exception {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, 1);
conf.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd");
FileStoreTable table = createFileStoreTable(conf);

TableWriteImpl<?> write = table.newWrite(commitUser);
// valid partition value: 20260316 matches yyyyMMdd
write.write(GenericRow.of(20260316, 1, 1L));
// invalid partition value: -100 does not match yyyyMMdd
assertThatThrownBy(() -> write.write(GenericRow.of(-100, 1, 1L)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("partition.timestamp-formatter");
write.close();
}

private BinaryRow partition(int x) {
BinaryRow partition = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(partition);
Expand Down