Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions docs/content/primary-key-table/pk-clustering-override.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
---
title: "PK Clustering Override"
weight: 10
type: docs
aliases:
- /primary-key-table/pk-clustering-override.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# PK Clustering Override

By default, data files in a primary key table are physically sorted by the primary key. This is optimal for point
lookups but can hurt scan performance when queries filter on non-primary-key columns.

**PK Clustering Override** mode changes the physical sort order of data files from the primary key to user-specified
clustering columns. This significantly improves scan performance for queries that filter or group by clustering columns,
while still maintaining primary key uniqueness through deletion vectors.

## Quick Start

```sql
CREATE TABLE my_table (
id BIGINT,
dt STRING,
city STRING,
amount DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'pk-clustering-override' = 'true',
'clustering.columns' = 'city',
'deletion-vectors.enabled' = 'true',
'bucket' = '4'
);
```

After this, data files within each bucket will be physically sorted by `city` instead of `id`. Queries like
`SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files by checking their min/max statistics
on the clustering column.

## How It Works

PK Clustering Override replaces the default LSM compaction with a two-phase clustering compaction:

**Phase 1 — Sort by Clustering Columns**: Newly flushed (level 0) files are read, sorted by the configured clustering
columns, and rewritten as sorted (level 1) files. A key index tracks each primary key's file and row position to
maintain uniqueness.

**Phase 2 — Merge Overlapping Sections**: Sorted files are grouped into sections based on clustering column range
overlap. Overlapping sections are merged together. Adjacent small sections are also consolidated to reduce file count
and IO amplification. Non-overlapping large files are left untouched.

During both phases, deduplication is handled via deletion vectors:

- **Deduplicate mode**: When a key already exists in an older file, the old row is marked as deleted.
- **First-row mode**: When a key already exists, the new row is marked as deleted, keeping the first-seen value.

When the number of files to merge exceeds `sort-spill-threshold`, smaller files are first spilled to row-based
temporary files to reduce memory consumption, preventing OOM during multi-way merge.

## Requirements

| Option | Requirement |
|--------|-------------|
| `pk-clustering-override` | `true` |
| `clustering.columns` | Must be set (one or more non-primary-key columns) |
| `deletion-vectors.enabled` | Must be `true` |
| `merge-engine` | `deduplicate` (default) or `first-row` only |
| `sequence.fields` | Must **not** be set |
| `record-level.expire-time` | Must **not** be set |

## Related Options

| Option | Default | Description |
|--------|---------|-------------|
| `clustering.columns` | (none) | Comma-separated column names used as the physical sort order for data files. |
| `sort-spill-threshold` | (auto) | When the number of merge readers exceeds this value, smaller files are spilled to row-based temp files to reduce memory usage. |
| `sort-spill-buffer-size` | `64 mb` | Buffer size used for external sort during Phase 1 rewrite. |

## When to Use

PK Clustering Override is beneficial when:

- Analytical queries frequently filter or aggregate on non-primary-key columns (e.g., `WHERE city = 'Beijing'`).
- The table uses `deduplicate` or `first-row` merge engine.
- You want data files physically co-located by a business dimension rather than the primary key.

It is **not** suitable when:

- Point lookups by primary key are the dominant access pattern (default LSM sort is already optimal).
- You need `partial-update` or `aggregation` merge engine.
- `sequence.fields` or `record-level.expire-time` is required.
14 changes: 10 additions & 4 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
</thead>
<tbody>
<tr>
<td><h5>aggregation.remove-record-on-delete</h5></td>
<td><h5>add-column-before-partition</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
</tr>
<tr>
<td><h5>add-column-before-partition</h5></td>
<td><h5>aggregation.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
</tr>
<tr>
<td><h5>alter-column-null-to-not-null.disabled</h5></td>
Expand Down Expand Up @@ -1055,6 +1055,12 @@
<td>String</td>
<td>You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.<ul><li>By default, read from the first field.</li><li>If the timestamp in the partition is a single field called 'dt', you can use '$dt'.</li><li>If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'.</li><li>If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.</li></ul></td>
</tr>
<tr>
<td><h5>pk-clustering-override</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enables clustering by non-primary key fields. When set to true, the physical sort order of data files is determined by the configured 'clustering.columns' instead of the primary key, optimizing query performance for non-PK columns.</td>
</tr>
<tr>
<td><h5>postpone.batch-write-fixed-bucket</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,15 @@ public InlineElement getDescription() {
.withDescription(
"The interval for checking visibility when visibility-callback enabled.");

public static final ConfigOption<Boolean> PK_CLUSTERING_OVERRIDE =
key("pk-clustering-override")
.booleanType()
.defaultValue(false)
.withDescription(
"Enables clustering by non-primary key fields. When set to true, the physical"
+ " sort order of data files is determined by the configured 'clustering.columns'"
+ " instead of the primary key, optimizing query performance for non-PK columns.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -2371,6 +2380,10 @@ public TableType type() {
return options.get(TYPE);
}

public boolean pkClusteringOverride() {
return options.get(PK_CLUSTERING_OVERRIDE);
}

public String formatType() {
return normalizeFileFormat(options.get(FILE_FORMAT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ public static class Builder {
private long memTableFlushThreshold = 64 * 1024 * 1024; // 64 MB
private long maxSstFileSize = 8 * 1024 * 1024; // 8 MB
private int blockSize = 32 * 1024; // 32 KB
private long cacheSize = 128 * 1024 * 1024; // 128 MB
private int level0FileNumCompactTrigger = 4;
private int sizeRatio = 10;
private CacheManager cacheManager;
private CompressOptions compressOptions = CompressOptions.defaultOptions();
private Comparator<MemorySlice> keyComparator = MemorySlice::compareTo;

Expand All @@ -502,9 +502,9 @@ public Builder blockSize(int blockSize) {
return this;
}

/** Set the block cache size in bytes. Default is 128 MB. */
public Builder cacheSize(long cacheSize) {
this.cacheSize = cacheSize;
/** Set the cache manager. */
public Builder cacheManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
return this;
}

Expand Down Expand Up @@ -551,7 +551,9 @@ public SimpleLsmKvDb build() {
}
}

CacheManager cacheManager = new CacheManager(MemorySize.ofBytes(cacheSize));
if (cacheManager == null) {
cacheManager = new CacheManager(MemorySize.ofMebiBytes(8));
}
SortLookupStoreFactory factory =
new SortLookupStoreFactory(
keyComparator, cacheManager, blockSize, compressOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/* This file is based on source code of LongPacker from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -126,6 +129,24 @@ public static int encodeInt(DataOutput os, int value) throws IOException {
return i;
}

/** @return bytes length. */
public static int encodeInt(OutputStream os, int value) throws IOException {

if (value < 0) {
throw new IllegalArgumentException("negative value: v=" + value);
}

int i = 1;
while ((value & ~0x7F) != 0) {
os.write(((value & 0x7F) | 0x80));
value >>>= 7;
i++;
}

os.write((byte) value);
return i;
}

public static int decodeInt(DataInput is) throws IOException {
for (int offset = 0, result = 0; offset < 32; offset += 7) {
int b = is.readUnsignedByte();
Expand All @@ -136,4 +157,19 @@ public static int decodeInt(DataInput is) throws IOException {
}
throw new Error("Malformed integer.");
}

public static int decodeInt(InputStream is) throws IOException {
for (int offset = 0, result = 0; offset < 32; offset += 7) {
int b = is.read();
if (b == -1) {
throw new EOFException("Reached end of stream while reading var-length int.");
}
b &= 0xFF;
result |= (b & 0x7F) << offset;
if ((b & 0x80) == 0) {
return result;
}
}
throw new Error("Malformed integer.");
}
}
Loading
Loading