[core] Introduce 'pk-clustering-override' to clustering by non-primary key fields#7426
Conversation
There was a problem hiding this comment.
Pull request overview
Introduces a new PK Clustering Override mode for primary-key tables, allowing data files to be physically sorted by user-selected clustering columns (instead of the primary key) to improve scan performance on non-PK filters/aggregations, while maintaining PK uniqueness via deletion vectors.
Changes:
- Adds
pk-clustering-overridecore option, documentation, and schema validation adjustments. - Introduces a new clustering compaction path (manager + file tracking) and a dedicated clustering data-file writer that stores clustering min/max keys.
- Improves temp-dir selection via
IOManager.pickRandomTempDir()and updates related call sites; adds extensive integration-style tests for clustering behavior (including spill and first-row mode).
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java | Adds end-to-end tests covering deduplicate/first-row behavior, overlap section merging, and spill scenarios under pk clustering override. |
| paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java | Adjusts DV validation to allow FIRST_ROW + deletion vectors when pk clustering override is enabled. |
| paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java | Skips PK key-range conflict checks when files are no longer clustered by PK. |
| paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFiles.java | New flat file tracker separating unsorted (L0) vs sorted (L1) files for clustering compaction. |
| paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java | New compaction manager factory validating required options for pk clustering override. |
| paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java | Implements two-phase clustering compaction (rewrite L0 by clustering cols + section-based merging) with DV-backed dedup/first-row and spill support. |
| paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KvCompactionManagerFactory.java | Routes pk clustering override tables to the new clustering compaction manager factory. |
| paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java | Refactors level grouping to reuse DataFileMeta.groupByLevel. |
| paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java | Adds createRollingClusteringFileWriter() to write clustering-sorted level-1 files. |
| paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java | Adds copyWithoutValue() builder variant for key-only reads used by key index bootstrap. |
| paimon-core/src/main/java/org/apache/paimon/io/KeyValueClusteringFileWriter.java | New writer that records clustering-field min/max into DataFileMeta instead of PK min/max. |
| paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java | Adds groupByLevel(...) helper. |
| paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java | Implements pickRandomTempDir() convenience API. |
| paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java | Adds pickRandomTempDir() to the IOManager interface. |
| paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java | Switches temp-dir selection to ioManager.pickRandomTempDir(). |
| paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java | Wires pk clustering override flag into conflict detection construction. |
| paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java | Updates tests for the new cache manager builder API. |
| paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java | Adds varint encode/decode helpers for InputStream/OutputStream. |
| paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java | Replaces cacheSize(...) with cacheManager(...) and introduces a default cache manager if unset. |
| paimon-api/src/main/java/org/apache/paimon/CoreOptions.java | Adds PK_CLUSTERING_OVERRIDE config option and accessor. |
| docs/layouts/shortcodes/generated/core_configuration.html | Adds generated config doc entry for pk-clustering-override. |
| docs/content/primary-key-table/pk-clustering-override.md | New user documentation page describing the feature, requirements, and related options. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
paimon-core/src/main/java/org/apache/paimon/io/KeyValueClusteringFileWriter.java
Show resolved
Hide resolved
...e/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFiles.java
Outdated
Show resolved
Hide resolved
paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
Show resolved
Hide resolved
paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/io/KeyValueClusteringFileWriter.java
Outdated
Show resolved
Hide resolved
LsomeYeah
left a comment
There was a problem hiding this comment.
+1, LGTM, nice work! Just one question:
Should we forbid changes to clustering.columns when pk-clustering-override is enabled? Existing level-1 files store old clustering column values in minKey/maxKey, which would be incompatible with newly compacted files using the new clustering column?
9ab2fff to
7f14be1
Compare
PK Clustering Override
By default, data files in a primary key table are physically sorted by the primary key. This is optimal for point
lookups but can hurt scan performance when queries filter on non-primary-key columns.
PK Clustering Override mode changes the physical sort order of data files from the primary key to user-specified
clustering columns. This significantly improves scan performance for queries that filter or group by clustering columns,
while still maintaining primary key uniqueness through deletion vectors.
Quick Start
After this, data files within each bucket will be physically sorted by
cityinstead ofid. Queries likeSELECT * FROM my_table WHERE city = 'Beijing'can skip irrelevant data files by checking their min/max statisticson the clustering column.
How It Works
PK Clustering Override replaces the default LSM compaction with a two-phase clustering compaction:
Phase 1 — Sort by Clustering Columns: Newly flushed (level 0) files are read, sorted by the configured clustering
columns, and rewritten as sorted (level 1) files. A key index tracks each primary key's file and row position to
maintain uniqueness.
Phase 2 — Merge Overlapping Sections: Sorted files are grouped into sections based on clustering column range
overlap. Overlapping sections are merged together. Adjacent small sections are also consolidated to reduce file count
and IO amplification. Non-overlapping large files are left untouched.
During both phases, deduplication is handled via deletion vectors:
When the number of files to merge exceeds
sort-spill-threshold, smaller files are first spilled to row-basedtemporary files to reduce memory consumption, preventing OOM during multi-way merge.
Requirements
pk-clustering-overridetrueclustering.columnsdeletion-vectors.enabledtruemerge-enginededuplicate(default) orfirst-rowonlysequence.fieldsrecord-level.expire-timeRelated Options
clustering.columnssort-spill-thresholdsort-spill-buffer-size64 mbWhen to Use
PK Clustering Override is beneficial when:
WHERE city = 'Beijing').deduplicateorfirst-rowmerge engine.It is not suitable when:
partial-updateoraggregationmerge engine.sequence.fieldsorrecord-level.expire-timeis required.