Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ public ChangelogDeletion(
public void cleanUnusedDataFiles(Changelog changelog, Predicate<ExpireFileEntry> skipper) {
if (changelog.changelogManifestList() != null) {
deleteAddedDataFiles(changelog.changelogManifestList());
}

if (manifestList.exists(changelog.deltaManifestList())) {
} else if (manifestList.exists(changelog.deltaManifestList())) {
cleanUnusedDataFiles(changelog.deltaManifestList(), skipper);
}
}
Expand All @@ -74,16 +72,10 @@ public void cleanUnusedDataFiles(Changelog changelog, Predicate<ExpireFileEntry>
public void cleanUnusedManifests(Changelog changelog, Set<String> skippingSet) {
if (changelog.changelogManifestList() != null) {
cleanUnusedManifestList(changelog.changelogManifestList(), skippingSet);
}

if (manifestList.exists(changelog.deltaManifestList())) {
} else if (manifestList.exists(changelog.deltaManifestList())) {
cleanUnusedManifestList(changelog.deltaManifestList(), skippingSet);
}

if (manifestList.exists(changelog.baseManifestList())) {
cleanUnusedManifestList(changelog.baseManifestList(), skippingSet);
}

// the index and statics manifest list should handle by snapshot deletion.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ protected void cleanUnusedManifests(
Set<String> skippingSet,
boolean deleteDataManifestLists,
boolean deleteChangelog) {
cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
if (deleteDataManifestLists) {
cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
}
if (deleteChangelog && snapshot.changelogManifestList() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
List<Snapshot> skippingSnapshots =
findSkippingTags(taggedSnapshots, earliestId, endExclusiveId);
skippingSnapshots.add(changelogManager.changelog(endExclusiveId));
skippingSnapshots.add(snapshotManager.earliestSnapshot());
Set<String> manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,35 @@

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/** Test for changelog expire. */
Expand Down Expand Up @@ -78,4 +91,56 @@ public void testChangelogExpire() throws Exception {
expireSnapshots.expireUntil(1, 7);
assertThatCode(() -> expire.expireUntil(1, 6)).doesNotThrowAnyException();
}

@Test
public void testChangelogExpireWithUncleanedManifestLists() throws Exception {
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
dynamicOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
dynamicOptions.put(CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key(), "3");
table = table.copy(dynamicOptions);

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = writeBuilder.newWrite();
StreamTableCommit commit = writeBuilder.newCommit();

write(write, createRow(1, 0, 1, 10));
commit.commit(1, write.prepareCommit(true, 1));

// Save snapshot-1's manifest list files before they get cleaned
SnapshotManager snapshotManager = table.snapshotManager();
FileStorePathFactory pathFactory = table.store().pathFactory();
FileIO fileIO = table.fileIO();
Snapshot snapshot1 = snapshotManager.snapshot(1);
Path manifestDir = pathFactory.manifestPath();
Path baseSaved = new Path(manifestDir, snapshot1.baseManifestList() + ".saved");
Path deltaSaved = new Path(manifestDir, snapshot1.deltaManifestList() + ".saved");
fileIO.copyFile(
pathFactory.toManifestListPath(snapshot1.baseManifestList()), baseSaved, false);
fileIO.copyFile(
pathFactory.toManifestListPath(snapshot1.deltaManifestList()), deltaSaved, false);

write(write, createRow(1, 0, 2, 20));
commit.commit(2, write.prepareCommit(true, 2));
write(write, createRow(1, 0, 3, 30));
commit.commit(3, write.prepareCommit(true, 3));

// Copy back changelog-1's manifest list files, simulating SnapshotDeletion failure
fileIO.copyFile(
baseSaved, pathFactory.toManifestListPath(snapshot1.baseManifestList()), true);
fileIO.copyFile(
deltaSaved, pathFactory.toManifestListPath(snapshot1.deltaManifestList()), true);

// expire changelog-1
write(write, createRow(1, 0, 4, 40));
commit.commit(4, write.prepareCommit(true, 4));

write.close();
commit.close();
Snapshot snapshot4 = snapshotManager.snapshot(4);
ManifestList manifestList = table.store().manifestListFactory().create();
for (ManifestFileMeta manifest : manifestList.readDataManifests(snapshot4)) {
assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected Options tableOptions() {
return options;
}

private Pair<GenericRow, Integer> createRow(int partition, int bucket, int key, int value) {
protected Pair<GenericRow, Integer> createRow(int partition, int bucket, int key, int value) {
return Pair.of(GenericRow.of(partition, key, value), bucket);
}

Expand Down Expand Up @@ -239,7 +239,7 @@ private long indexManifestSize() throws IOException {
.count();
}

private void write(StreamTableWrite write, Pair<GenericRow, Integer> rowWithBucket)
protected void write(StreamTableWrite write, Pair<GenericRow, Integer> rowWithBucket)
throws Exception {
write.write(rowWithBucket.getKey(), rowWithBucket.getValue());
}
Expand Down
Loading