diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java index 348d2da6bd62..450f7fcb690f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java @@ -63,10 +63,10 @@ public ChangelogDeletion( public void cleanUnusedDataFiles(Changelog changelog, Predicate skipper) { if (changelog.changelogManifestList() != null) { deleteAddedDataFiles(changelog.changelogManifestList()); - } - - if (manifestList.exists(changelog.deltaManifestList())) { - cleanUnusedDataFiles(changelog.deltaManifestList(), skipper); + } else { + if (manifestList.exists(changelog.deltaManifestList())) { + cleanUnusedDataFiles(changelog.deltaManifestList(), skipper); + } } } @@ -74,14 +74,15 @@ public void cleanUnusedDataFiles(Changelog changelog, Predicate public void cleanUnusedManifests(Changelog changelog, Set skippingSet) { if (changelog.changelogManifestList() != null) { cleanUnusedManifestList(changelog.changelogManifestList(), skippingSet); - } - - if (manifestList.exists(changelog.deltaManifestList())) { - cleanUnusedManifestList(changelog.deltaManifestList(), skippingSet); - } - - if (manifestList.exists(changelog.baseManifestList())) { - cleanUnusedManifestList(changelog.baseManifestList(), skippingSet); + } else { + if (manifestList.exists(changelog.deltaManifestList())) { + cleanUnusedManifestList(changelog.deltaManifestList(), skippingSet); + } + // See FileDeletionBase#cleanUnusedManifests + // about why we need to clean base manifest + if (manifestList.exists(changelog.baseManifestList())) { + cleanUnusedManifestList(changelog.baseManifestList(), skippingSet); + } } // the index and statics manifest list should handle by snapshot deletion. diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 231636355a8d..4256a503c1ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -334,6 +334,13 @@ protected void cleanUnusedManifests( boolean deleteDataManifestLists, boolean deleteChangelog) { if (deleteDataManifestLists) { + // deleteDataManifestLists will be false + // with changelog decouple + none changelog producer. + // Why don't we clean base manifest in this scenario? + // Because cleanUnusedManifestList is compared with the earliest snapshot. + // For none changelog producer, changelog files are the level 0 files. + // Even if these files are not used by the earliest snapshot, + // we have to keep them as changelog, and clean then in ChangelogDeletion. cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 1b42f2339bad..bed5749895e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -149,6 +149,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { List skippingSnapshots = findSkippingTags(taggedSnapshots, earliestId, endExclusiveId); skippingSnapshots.add(changelogManager.changelog(endExclusiveId)); + skippingSnapshots.add(snapshotManager.earliestSnapshot()); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index f904cde0819f..8142d44acf7d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -717,7 +717,8 @@ private static Set getChangelogFileInUse( // delta file if (options.changelogProducer() == CoreOptions.ChangelogProducer.NONE) { - // TODO why we need to keep base manifests? + // See FileDeletionBase#cleanUnusedManifests + // about why we need to keep base manifest result.add(pathFactory.toManifestListPath(changelog.baseManifestList())); manifestList .readDataManifests(changelog) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java index c6eab6dd19d5..1821d927c49e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java @@ -18,22 +18,35 @@ package org.apache.paimon.table; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; /** Test for changelog expire. */ @@ -78,4 +91,56 @@ public void testChangelogExpire() throws Exception { expireSnapshots.expireUntil(1, 7); assertThatCode(() -> expire.expireUntil(1, 6)).doesNotThrowAnyException(); } + + @Test + public void testChangelogExpireWithUncleanedManifestLists() throws Exception { + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1"); + dynamicOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1"); + dynamicOptions.put(CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key(), "3"); + table = table.copy(dynamicOptions); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); + StreamTableWrite write = writeBuilder.newWrite(); + StreamTableCommit commit = writeBuilder.newCommit(); + + write(write, createRow(1, 0, 1, 10)); + commit.commit(1, write.prepareCommit(true, 1)); + + // Save snapshot-1's manifest list files before they get cleaned + SnapshotManager snapshotManager = table.snapshotManager(); + FileStorePathFactory pathFactory = table.store().pathFactory(); + FileIO fileIO = table.fileIO(); + Snapshot snapshot1 = snapshotManager.snapshot(1); + Path manifestDir = pathFactory.manifestPath(); + Path baseSaved = new Path(manifestDir, snapshot1.baseManifestList() + ".saved"); + Path deltaSaved = new Path(manifestDir, snapshot1.deltaManifestList() + ".saved"); + fileIO.copyFile( + pathFactory.toManifestListPath(snapshot1.baseManifestList()), baseSaved, false); + fileIO.copyFile( + pathFactory.toManifestListPath(snapshot1.deltaManifestList()), deltaSaved, false); + + write(write, createRow(1, 0, 2, 20)); + commit.commit(2, write.prepareCommit(true, 2)); + write(write, createRow(1, 0, 3, 30)); + commit.commit(3, write.prepareCommit(true, 3)); + + // Copy back changelog-1's manifest list files, simulating SnapshotDeletion failure + fileIO.copyFile( + baseSaved, pathFactory.toManifestListPath(snapshot1.baseManifestList()), true); + fileIO.copyFile( + deltaSaved, pathFactory.toManifestListPath(snapshot1.deltaManifestList()), true); + + // expire changelog-1 + write(write, createRow(1, 0, 4, 40)); + commit.commit(4, write.prepareCommit(true, 4)); + + write.close(); + commit.close(); + Snapshot snapshot4 = snapshotManager.snapshot(4); + ManifestList manifestList = table.store().manifestListFactory().create(); + for (ManifestFileMeta manifest : manifestList.readDataManifests(snapshot4)) { + assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue(); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index d9e58e245ce7..7293e8f8be1a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -52,7 +52,7 @@ protected Options tableOptions() { return options; } - private Pair createRow(int partition, int bucket, int key, int value) { + protected Pair createRow(int partition, int bucket, int key, int value) { return Pair.of(GenericRow.of(partition, key, value), bucket); } @@ -239,7 +239,7 @@ private long indexManifestSize() throws IOException { .count(); } - private void write(StreamTableWrite write, Pair rowWithBucket) + protected void write(StreamTableWrite write, Pair rowWithBucket) throws Exception { write.write(rowWithBucket.getKey(), rowWithBucket.getValue()); }