Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.Pair;
Expand Down Expand Up @@ -52,6 +53,10 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter<InternalR
@Nullable private final DataFileIndexWriter dataFileIndexWriter;
private final FileSource fileSource;
@Nullable private final List<String> writeCols;
private final int seqNumberFieldIndex;
private long minSeqNumber;
private long maxSeqNumber;
private boolean hasNullSeqNumber;

public RowDataFileWriter(
FileIO fileIO,
Expand All @@ -76,6 +81,10 @@ public RowDataFileWriter(
fileIO, dataFileToFileIndexPath(path), writeSchema, fileIndexOptions);
this.fileSource = fileSource;
this.writeCols = writeCols;
this.seqNumberFieldIndex = writeSchema.getFieldIndex(SpecialFields.SEQUENCE_NUMBER.name());
this.minSeqNumber = Long.MAX_VALUE;
this.maxSeqNumber = Long.MIN_VALUE;
this.hasNullSeqNumber = false;
}

@Override
Expand All @@ -85,7 +94,7 @@ public void write(InternalRow row) throws IOException {
if (dataFileIndexWriter != null) {
dataFileIndexWriter.write(row);
}
seqNumCounter.add(1L);
updateSeqNumber(row);
}

@Override
Expand All @@ -111,8 +120,8 @@ public DataFileMeta result() throws IOException {
fileSize,
recordCount(),
statsPair.getRight(),
seqNumCounter.getValue() - super.recordCount(),
seqNumCounter.getValue() - 1,
minSeqNumber(),
maxSeqNumber(),
schemaId,
indexResult.independentIndexFile() == null
? Collections.emptyList()
Expand All @@ -124,4 +133,37 @@ public DataFileMeta result() throws IOException {
null,
writeCols);
}

private long minSeqNumber() {
if (seqNumberFieldIndex == -1) {
return seqNumCounter.getValue() - super.recordCount();
}
// minSeqNumber stays at Long.MAX_VALUE when all records have null sequence numbers.
// Returning 0 triggers RowTrackingCommitUtils.assignSnapshotId() to use snapshot ID.
return minSeqNumber == Long.MAX_VALUE ? 0 : minSeqNumber;
}

private long maxSeqNumber() {
if (seqNumberFieldIndex == -1) {
return seqNumCounter.getValue() - 1;
}
// When hasNullSeqNumber is true, some records have null sequence numbers.
// Returning 0 triggers RowTrackingCommitUtils.assignSnapshotId() to use snapshot ID for
// max.
return hasNullSeqNumber ? 0 : maxSeqNumber;
}

private void updateSeqNumber(InternalRow row) {
seqNumCounter.add(1L);

// If sequence number field exists, extract min/max from row data
if (seqNumberFieldIndex != -1 && !row.isNullAt(seqNumberFieldIndex)) {
long seqNum = row.getLong(seqNumberFieldIndex);
minSeqNumber = Math.min(minSeqNumber, seqNum);
maxSeqNumber = Math.max(maxSeqNumber, seqNum);
} else if (seqNumberFieldIndex != -1) {
// Manifest will calculate the correct max based on snapshot id
hasNullSeqNumber = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,20 @@ public static RowTrackingAssigned assignRowTracking(
private static void assignSnapshotId(
long snapshotId, List<ManifestEntry> deltaFiles, List<ManifestEntry> snapshotAssigned) {
for (ManifestEntry entry : deltaFiles) {
if (entry.file().minSequenceNumber() == 0L) {
long minSeqNumber = entry.file().minSequenceNumber();
long maxSeqNumber = entry.file().maxSequenceNumber();
if (minSeqNumber == 0L) {
// Case 1: New file (e.g., from INSERT)
// All records in this file get the current snapshot ID as sequence number
snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, snapshotId));
} else if (maxSeqNumber == 0L) {
// Case 2: File with some modified records
// - min: Preserve original sequence number (from unmodified records)
// - max: Assign current snapshot ID
snapshotAssigned.add(entry.assignSequenceNumber(minSeqNumber, snapshotId));
} else {
// Case 3: Pure compact file (no modified records)
// Preserve original min/max sequence numbers from source files
snapshotAssigned.add(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,37 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3))
)

sql("INSERT INTO t VALUES (4, '4')")
sql("INSERT INTO t VALUES (5, '5')")
// snapshot 7: should merge files with sequence numbers [1, 6]
sql("CALL sys.compact(table => 't')")
checkAnswer(
sql("SELECT min_sequence_number, max_sequence_number FROM `t$files`"),
Seq(Row(1, 6))
)
// snapshot 8: Updated record has null sequence number
sql("UPDATE t SET data = 22 WHERE id = 2")

// snapshot 9 ~ 10: add new file, and set sequence number to null
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(6, 8)")
sql("UPDATE t SET data = 67 WHERE _SEQUENCE_NUMBER = 9")
checkAnswer(
sql(
"SELECT min_sequence_number, max_sequence_number FROM `t$files` order by min_sequence_number"),
Seq(Row(1, 8), Row(10, 10))
)
checkAnswer(
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(
Row(1, 1, 0, 1),
Row(2, 22, 1, 8),
Row(3, 3, 2, 3),
Row(4, 4, 3, 5),
Row(5, 5, 4, 6),
Row(6, 67, 5, 10),
Row(7, 67, 6, 10))
)
}
}

Expand Down
Loading