Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
if (unbounded && table instanceof DataTable) {
source =
new ContinuousFileStoreSource(
readBuilder, table.options(), limit, false, rowData);
readBuilder, table.options(), limit, isUnordered(table), rowData);
} else {
source =
new StaticFileStoreSource(
Expand Down Expand Up @@ -157,4 +157,20 @@ public String asSummaryString() {
public boolean isUnbounded() {
return unbounded;
}

private static boolean isUnordered(Table table) {
if (!table.primaryKeys().isEmpty()) {
return false;
}
Options options = Options.fromMap(table.options());
int bucket = options.get(CoreOptions.BUCKET);
if (bucket == -1) {
// bucket = -1 means BUCKET_UNAWARE for append-only tables
return true;
} else if (bucket > 0) {
// HASH_FIXED mode: check append order option
return !options.get(CoreOptions.BUCKET_APPEND_ORDERED);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.system.AuditLogTable;
import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.types.DataTypes;

Expand Down Expand Up @@ -133,6 +134,63 @@ public void testSystemTableParallelism() throws Exception {
assertThat(sourceStream1.getParallelism()).isEqualTo(3);
}

@Test
public void testSystemTableSourceUnorderedForBucketUnawareTable() throws Exception {
// bucket = -1 (BUCKET_UNAWARE append-only table) wrapped in a system table should produce
// unordered = true so splits are distributed via FIFOSplitAssigner across all tasks
FileStoreTable bucketUnawareTable = createTable(ImmutableMap.of("bucket", "-1"));
AuditLogTable auditLogTable = new AuditLogTable(bucketUnawareTable);

SystemTableSource tableSource =
new SystemTableSource(
auditLogTable, true, ObjectIdentifier.of("cat", "db", "table$audit_log"));
PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<RowData> sourceStream =
runtimeProvider.produceDataStream(s -> Optional.empty(), env);

// Retrieve the source from the transformation and verify unordered = true
ContinuousFileStoreSource source =
(ContinuousFileStoreSource)
((org.apache.flink.streaming.api.transformations.SourceTransformation<
?, ?, ?>)
sourceStream.getTransformation())
.getSource();
java.lang.reflect.Field unorderedField =
ContinuousFileStoreSource.class.getDeclaredField("unordered");
unorderedField.setAccessible(true);
assertThat((boolean) unorderedField.get(source)).isTrue();
}

@Test
public void testSystemTableSourceOrderedForHashFixedTable() throws Exception {
// bucket > 0 (HASH_FIXED) with bucket-append-ordered = true should produce unordered =
// false
FileStoreTable hashFixedTable =
createTable(ImmutableMap.of("bucket", "4", "bucket-key", "a"));
ReadOptimizedTable roTable = new ReadOptimizedTable(hashFixedTable);

SystemTableSource tableSource =
new SystemTableSource(roTable, true, ObjectIdentifier.of("cat", "db", "table$ro"));
PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<RowData> sourceStream =
runtimeProvider.produceDataStream(s -> Optional.empty(), env);

ContinuousFileStoreSource source =
(ContinuousFileStoreSource)
((org.apache.flink.streaming.api.transformations.SourceTransformation<
?, ?, ?>)
sourceStream.getTransformation())
.getSource();
java.lang.reflect.Field unorderedField =
ContinuousFileStoreSource.class.getDeclaredField("unordered");
unorderedField.setAccessible(true);
assertThat((boolean) unorderedField.get(source)).isFalse();
}

private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) {
return (PaimonDataStreamScanProvider)
tableSource.getScanRuntimeProvider(
Expand Down