diff --git a/Cargo.lock b/Cargo.lock index 8396188c7..b6f7b6887 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1156,6 +1156,7 @@ dependencies = [ "object_store", "opentelemetry", "parking_lot", + "parquet-ext", "schemars 1.2.1", "serde", "serde_json", @@ -1348,6 +1349,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.21" @@ -1665,6 +1672,8 @@ dependencies = [ "arrow-schema", "arrow-select", "arrow-string", + "half 2.7.1", + "rand 0.9.2", ] [[package]] @@ -2657,6 +2666,12 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "castaway" version = "0.2.4" @@ -2764,6 +2779,33 @@ dependencies = [ "phf 0.12.1", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half 2.7.1", +] + [[package]] name = "cid" version = "0.11.1" @@ -3243,6 +3285,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -3582,7 +3662,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.115", ] [[package]] @@ -5174,6 +5254,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" dependencies = [ "futures-core", "futures-sink", + "nanorand", "spin", ] @@ -6348,6 +6429,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "is-terminal" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "is-wsl" version = "0.4.0" @@ -7047,6 +7139,15 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.17", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -7446,6 +7547,12 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -7748,6 +7855,7 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-csv", "arrow-data", "arrow-ipc", "arrow-schema", @@ -7756,6 +7864,7 @@ dependencies = [ "brotli", "bytes", "chrono", + "crc32fast", "flate2", "futures", "half 2.7.1", @@ -7765,8 +7874,14 @@ dependencies = [ "num-integer", "num-traits", "object_store", + "parquet-variant", + "parquet-variant-compute", + "parquet-variant-json", "paste", + "ring", "seq-macro", + "serde", + "serde_json", "simdutf8", "snap", "thrift", @@ -7775,6 +7890,68 @@ dependencies = [ "zstd", ] +[[package]] +name = "parquet-ext" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "bytes", + "criterion", + "flume", + "futures", + "parking_lot", + "parquet", + "rand 0.9.2", + "tokio", +] + +[[package]] +name = "parquet-variant" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6c31f8f9bfefb9dbf67b0807e00fd918676954a7477c889be971ac904103184" +dependencies = [ + "arrow-schema", + "chrono", + "half 2.7.1", + "indexmap 2.13.0", + "simdutf8", + "uuid", +] + +[[package]] +name = "parquet-variant-compute" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196cd9f7178fed3ac8d5e6d2b51193818e896bbc3640aea3fde3440114a8f39c" +dependencies = [ + "arrow", + "arrow-schema", + "chrono", + "half 2.7.1", + "indexmap 2.13.0", + "parquet-variant", + "parquet-variant-json", + "uuid", +] + +[[package]] +name = "parquet-variant-json" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed23d7acc90ef60f7fdbcc473fa2fdaefa33542ed15b84388959346d52c839be" +dependencies = [ + "arrow-schema", + "base64 0.22.1", + "chrono", + "parquet-variant", + "serde_json", + "uuid", +] + [[package]] name = "paste" version = "1.0.15" @@ -8076,6 +8253,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "png" version = "0.18.0" @@ -12693,6 +12898,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.10.0" diff --git a/Cargo.toml b/Cargo.toml index 235d59397..95e2daf9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "crates/core/worker-core", "crates/core/worker-datasets-derived", "crates/core/worker-datasets-raw", + "crates/parquet-ext", "crates/extractors/evm-rpc", "crates/extractors/evm-rpc/gen", "crates/extractors/firehose", @@ -114,6 +115,7 @@ opentelemetry-otlp = { version = "0.31", features = [ "http-proto", ] } opentelemetry_sdk = { version = "0.31" } +parking_lot = "0.12.5" pg_escape = "0.1" pgtemp = "0.7.1" pretty_assertions = "1.4.1" diff --git a/crates/core/monitoring/src/logging.rs b/crates/core/monitoring/src/logging.rs index ad53febd3..673021481 100644 --- a/crates/core/monitoring/src/logging.rs +++ b/crates/core/monitoring/src/logging.rs @@ -96,6 +96,7 @@ const AMP_CRATES: &[&str] = &[ "metadata_db", "metadata_db_postgres", "monitoring", + "parquet_ext", "server", "solana_datasets", "solana_storage_proto", diff --git a/crates/core/worker-core/Cargo.toml b/crates/core/worker-core/Cargo.toml index a9f14ea87..577e937e7 100644 --- a/crates/core/worker-core/Cargo.toml +++ b/crates/core/worker-core/Cargo.toml @@ -17,7 +17,7 @@ futures.workspace = true metadata-db = { path = "../metadata-db" } monitoring = { path = "../monitoring" } object_store.workspace = true -parking_lot = "0.12.5" +parking_lot.workspace = true schemars = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true @@ -25,6 +25,7 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true url.workspace = true +parquet-ext = { path = "../../parquet-ext", features = ["async", "zstd"] } [dev-dependencies] opentelemetry.workspace = true diff --git a/crates/core/worker-core/src/parquet_writer.rs b/crates/core/worker-core/src/parquet_writer.rs index 6f1d221ae..64346faec 100644 --- a/crates/core/worker-core/src/parquet_writer.rs +++ b/crates/core/worker-core/src/parquet_writer.rs @@ -11,7 +11,6 @@ use common::{ }, }, parquet::{ - arrow::AsyncArrowWriter, errors::ParquetError, file::{metadata::KeyValue, properties::WriterProperties as ParquetWriterProperties}, }, @@ -23,6 +22,7 @@ use metadata_db::{ physical_table_revision::LocationId, }; use object_store::{ObjectMeta, buffered::BufWriter}; +use parquet_ext::arrow::async_writer::AsyncArrowWriter; use url::Url; pub async fn commit_metadata( diff --git a/crates/parquet-ext/Cargo.toml b/crates/parquet-ext/Cargo.toml new file mode 100644 index 000000000..3817c1c54 --- /dev/null +++ b/crates/parquet-ext/Cargo.toml @@ -0,0 +1,103 @@ +[package] +name = "parquet-ext" +edition.workspace = true +version.workspace = true + +[lib] +name = "parquet_ext" +path = "src/lib.rs" +bench = false + +[[bench]] +name = "arrow_writer" +harness = false + +[[bench]] +name = "end_to_end_writer" +harness = false + +[profile.bench] +opt-level = 3 +lto = "fat" +codegen-units = 1 + +[package.metadata.docs.rs] +all-features = true + +[features] +default = ["default-parquet", "tokio"] +default-parquet = ["parquet/default"] + +tokio = ["async", "object_store", "parquet/arrow"] + +# Pass-through features to parquet +async = ["parquet/async", "dep:tokio", "dep:futures"] +object_store = ["parquet/object_store", "async"] + +# Compression codecs +snap = ["parquet/snap"] +brotli = ["parquet/brotli"] +flate2 = ["parquet/flate2"] +flate2-rust_backend = ["parquet/flate2-rust_backened"] +flate2-zlib-rs = ["parquet/flate2-zlib-rs"] +lz4 = ["parquet/lz4"] +lz4_flex = ["parquet/lz4_flex"] +zstd = ["parquet/zstd"] + +# Other features +arrow_canonical_extension_types = ["parquet/arrow_canonical_extension_types"] +arrow-csv = ["parquet/arrow-csv"] +json = ["parquet/json"] +crc = ["parquet/crc"] +encryption = ["parquet/encryption"] +experimental = ["parquet/experimental"] +serde = ["parquet/serde"] + +[dependencies.arrow-array] +version = "^57.3" +optional = false + +[dependencies.arrow-buffer] +version = "^57.3" +optional = true + +[dependencies.arrow-schema] +version = "^57.3" +optional = false + +[dependencies.bytes] +workspace = true +optional = false + +[dependencies.flume] +version = "0.11" +optional = false + +[dependencies.futures] +workspace = true +features = ["std"] +optional = true + +[dependencies.parking_lot] +workspace = true +optional = false + +[dependencies.parquet] +version = "^57.3" +features = ["arrow"] +optional = false + +[dependencies.tokio] +workspace = true +optional = true + +[dev-dependencies.criterion] +version = "0.5" +features = ["async_tokio"] + +[dev-dependencies.arrow] +version = "^57.3" +features = ["test_utils"] + +[dev-dependencies.rand] +workspace = true \ No newline at end of file diff --git a/crates/parquet-ext/README.md b/crates/parquet-ext/README.md new file mode 100644 index 000000000..c6d2b90e0 --- /dev/null +++ b/crates/parquet-ext/README.md @@ -0,0 +1,149 @@ +# parquet-ext + +A high-performance, pipelined Parquet writer that parallelizes column encoding across threads. Drop-in async replacement for `parquet::arrow::AsyncArrowWriter` with up to **8x speedup** on multi-column schemas. + +## Motivation + +The standard Arrow Parquet writer encodes columns sequentially within each row group. For wide schemas or CPU-intensive compression (e.g., ZSTD), this leaves significant parallelism on the table. `parquet-ext` introduces a two-stage pipeline that encodes columns in parallel on a dedicated thread pool while an async writer thread streams completed row groups to the output. + +## Architecture + +``` text + ┌──────────────────────────────┐ + RecordBatch ──► │ Encoder Thread Pool │ + │ (parallel column encoding) │ + └──────────────┬───────────────┘ + │ encoded row groups + ▼ + ┌──────────────────────────────┐ + │ Async Writer Thread │ + │ (sequential row group I/O) │ + └──────────────┬───────────────┘ + │ + ▼ + Parquet File +``` + +1. **Encoder stage** -- Receives batches via bounded channel, encodes leaf columns in parallel using work-stealing across a thread scope, and sends completed row groups to the writer. +2. **Writer stage** -- Receives encoded row groups, reorders them to preserve sequential ordering, and writes them to the async output. + +## Usage + +### Drop-in `AsyncArrowWriter` replacement + +```rust +use parquet_ext::arrow::async_writer::AsyncArrowWriter; + +let mut writer = AsyncArrowWriter::try_new( + async_output, // impl AsyncFileWriter + Send + schema.clone(), // Arrow SchemaRef + Some(writer_props), // Optional WriterProperties +)?; + +writer.write(&batch).await?; +writer.flush().await?; +let metadata = writer.close().await?; +``` + +### Pipeline builder (advanced) + +For fine-grained control over progress tracking, parallelism, and channel sizing: + +```rust +use parquet_ext::writer::Pipeline; + +let pipeline = Pipeline::, _>::builder() + .build_properties(&schema, Some(props))? + .build_writer(Vec::new())? + .with_progress() // enable atomic progress counters + .into_factory() + .spawn_encoder() // start encoder thread + .spawn_writer(async_out) // start writer thread + .build(); + +pipeline.add_batch_async(batch).await?; +let metadata = pipeline.close_async().await?; +``` + +## Features + +| Feature | Default | Description | +| ------- | ------- | ----------- | +| `default-parquet` | yes | Enables parquet's default feature set | +| `tokio` | yes | Async runtime support via tokio | +| `async` | via `tokio` | Core async support (`parquet/async`) | +| `object_store` | via `tokio` | `object_store` integration | +| `zstd` | no | ZSTD compression codec | +| `snap` | no | Snappy compression codec | +| `brotli` | no | Brotli compression codec | +| `lz4` / `lz4_flex` | no | LZ4 compression codecs | +| `flate2` | no | Deflate/gzip compression | + +## Benchmarks + +Two benchmark suites compare parquet-ext against Arrow's built-in writer. Both write to `tokio::io::sink()` to isolate encode/pipeline overhead from I/O. + +```bash +cargo bench -p parquet-ext +``` + +### [arrow_writer](benches/arrow_writer.rs) - Encoding microbenchmark + +Copied from the [parquet crate's benchmarks](https://github.com/apache/arrow-rs/blob/main/parquet/benches/arrow_writer.rs) Isolates the **row group encoding** step only (no async pipeline, no file framing). Compares `RowGroupEncoder::encode()` (parallel, work-stealing across threads) against Arrow's sequential `ArrowRowGroupWriterFactory` column-by-column encoding. + +- **Batch size**: Fixed at 4,096 rows +- **Schemas**: 11 data type variants -- primitive, primitive non-null, bool, bool non-null, string, string non-null, string+binary view, string dictionary, float with NaNs, list primitive, list primitive non-null +- **Writer properties**: default, bloom filter, Parquet 2.0, ZSTD, ZSTD + Parquet 2.0 + +Wins 31/55 configurations (56%). Best speedup: **1.84x** (primitive/default). See [arrow_writer.md](benches/arrow_writer.md) for full results. + +### [end_to_end_writer](benches/end_to_end_writer.rs) - Full pipeline benchmark + +Measures the **complete async write path** end-to-end: batch submission, encoding, row group assembly, and async file writing. Compares `parquet_ext::AsyncArrowWriter` against `parquet::arrow::AsyncArrowWriter`. + +- **Row counts**: 1K, 100K, 1M +- **Column counts**: 10, 30 +- **Schemas**: Simple (Int64, Utf8) and Complex (Int64, Utf8, List\, List\, Struct{Int64,Utf8,Float64}, Boolean) +- **Compression**: uncompressed, ZSTD level 1, ZSTD level 3 + +Wins 31/36 configurations (86%). Best speedup: **8.1x** (simple, 30 columns, 1M rows, ZSTD1). See [end_to_end_writer.md](benches/end_to_end_writer.md) for full results. + +### End-to-end summary (speedup = baseline / parquet-ext) + +**Complex schema**: + +| Rows | 10 columns | 30 columns | +|-----:|-----------:|-----------:| +| 1K | 1.5x | 1.6x | +| 100K | 3.2x | 6.5x | +| 1M | 3.6x | 7.1x | + +**Simple schema**: + +| Rows | 10 columns | 30 columns | +|-----:|-----------:|-----------:| +| 1K | 0.48x | 0.83x | +| 100K | 4.1x | 7.2x | +| 1M | 5.1x | 7.9x | + +Peak throughput: **6.1 GiB/s** (simple, 30 columns, 1M rows, uncompressed) vs parquet crate's baseline's 795 MiB/s. + +### When parquet-ext wins + +- Many columns (more parallel work to distribute) +- Large batches (100K+ rows amortize thread overhead) +- CPU-intensive compression (ZSTD, brotli) +- Complex/nested types (lists, structs) + +### When parquet is faster* + +- Very small batches (< 1K rows) where thread coordination overhead dominates +- Single-column or very narrow schemas + +\* Although technically faster, times for both writers are in the hundreds of microseconds. + +## Configuration + +| Environment Variable | Description | Default | +|----------------------------------|----------------------------------|------------------------------| +| `AMP_PARQUET_WRITER_PARALLELISM` | Number of encoder worker threads | System available parallelism | diff --git a/crates/parquet-ext/benches/arrow_writer.md b/crates/parquet-ext/benches/arrow_writer.md new file mode 100644 index 000000000..8fc280877 --- /dev/null +++ b/crates/parquet-ext/benches/arrow_writer.md @@ -0,0 +1,156 @@ +# Arrow Benchmark: Encoding Performance + +## A/B comparison of row group column encoding: Arrow (sequential) vs parquet-ext (parallel) + +- **Batch size**: 4,096 rows +- **Writer**: `Empty` sink (no I/O -- pure encoding) +- **Arrow**: Sequential column-by-column encoding via `ArrowRowGroupWriterFactory` +- **parquet-ext**: Parallel column encoding via `RowGroupEncoder::encode()` with thread-per-column work stealing + +## Summary + +Speedup = Arrow time / parquet-ext time. Values **> 1.0x** mean parquet-ext is faster. + +| Data Type | Default | Bloom Filter | Parquet 2.0 | ZSTD | ZSTD + Pq2 | +| :--- | :---: | :---: | :---: | :---: | :---: | +| primitive | **1.84x** | 0.99x | **1.29x** | 0.72x | 0.55x | +| primitive non null | 0.77x | 0.98x | 0.59x | 0.79x | 0.76x | +| bool | 0.55x | 0.65x | 0.62x | 0.61x | 0.64x | +| bool non null | 0.33x | 0.60x | 0.47x | 0.42x | 0.51x | +| string | **1.18x** | **1.74x** | **1.27x** | **1.45x** | **1.20x** | +| string non null | **1.49x** | **1.50x** | **1.44x** | **1.71x** | **1.78x** | +| string and binary view | **1.10x** | **1.11x** | **1.08x** | **1.36x** | **1.30x** | +| string dictionary | 0.86x | 0.92x | 0.97x | 0.95x | 0.94x | +| float with nans | **1.11x** | **1.03x** | 1.00x | **1.41x** | **1.19x** | +| list primitive | **1.21x** | **1.26x** | **1.39x** | **1.05x** | **1.18x** | +| list primitive non null | **1.34x** | **1.33x** | **1.22x** | **1.17x** | **1.15x** | + +## Detailed Results + +### Primitive + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 322.23 µs | 175.16 µs | **1.84x** | 545.99 MiB/s | 1004.4 MiB/s | +| Bloom Filter | 651.87 µs | 659.12 µs | 0.99x | 269.89 MiB/s | 266.92 MiB/s | +| Parquet 2.0 | 376.44 µs | 290.95 µs | **1.29x** | 467.37 MiB/s | 604.69 MiB/s | +| ZSTD | 554.52 µs | 769.43 µs | 0.72x | 317.27 MiB/s | 228.66 MiB/s | +| ZSTD + Pq2 | 735.95 µs | 1.3308 ms | 0.55x | 239.06 MiB/s | 132.20 MiB/s | + +### Primitive Non Null + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 255.33 µs | 332.22 µs | 0.77x | 675.66 MiB/s | 519.27 MiB/s | +| Bloom Filter | 734.30 µs | 746.35 µs | 0.98x | 234.94 MiB/s | 231.15 MiB/s | +| Parquet 2.0 | 290.08 µs | 490.18 µs | 0.59x | 594.71 MiB/s | 351.94 MiB/s | +| ZSTD | 645.49 µs | 816.08 µs | 0.79x | 267.26 MiB/s | 211.40 MiB/s | +| ZSTD + Pq2 | 592.23 µs | 779.41 µs | 0.76x | 291.30 MiB/s | 221.34 MiB/s | + +### Bool + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 27.763 µs | 50.144 µs | 0.55x | 38.198 MiB/s | 21.149 MiB/s | +| Bloom Filter | 46.233 µs | 70.883 µs | 0.65x | 22.938 MiB/s | 14.961 MiB/s | +| Parquet 2.0 | 33.121 µs | 53.217 µs | 0.62x | 32.019 MiB/s | 19.928 MiB/s | +| ZSTD | 32.192 µs | 52.496 µs | 0.61x | 32.943 MiB/s | 20.201 MiB/s | +| ZSTD + Pq2 | 36.506 µs | 57.114 µs | 0.64x | 29.050 MiB/s | 18.568 MiB/s | + +### Bool Non Null + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 9.4085 µs | 28.164 µs | 0.33x | 60.818 MiB/s | 20.317 MiB/s | +| Bloom Filter | 30.214 µs | 50.474 µs | 0.60x | 18.938 MiB/s | 11.337 MiB/s | +| Parquet 2.0 | 16.282 µs | 34.956 µs | 0.47x | 35.144 MiB/s | 16.369 MiB/s | +| ZSTD | 13.287 µs | 31.624 µs | 0.42x | 43.066 MiB/s | 18.094 MiB/s | +| ZSTD + Pq2 | 20.109 µs | 39.699 µs | 0.51x | 28.455 MiB/s | 14.414 MiB/s | + +### String + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 552.17 µs | 469.77 µs | **1.18x** | 3.6222 GiB/s | 4.2576 GiB/s | +| Bloom Filter | 934.63 µs | 538.10 µs | **1.74x** | 2.1400 GiB/s | 3.7170 GiB/s | +| Parquet 2.0 | 529.12 µs | 415.78 µs | **1.27x** | 3.7800 GiB/s | 4.8105 GiB/s | +| ZSTD | 1.5094 ms | 1.0415 ms | **1.45x** | 1.3251 GiB/s | 1.9204 GiB/s | +| ZSTD + Pq2 | 1.3461 ms | 1.1246 ms | **1.20x** | 1.4858 GiB/s | 1.7785 GiB/s | + +### String Non Null + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 667.14 µs | 447.43 µs | **1.49x** | 2.9966 GiB/s | 4.4681 GiB/s | +| Bloom Filter | 944.04 µs | 630.33 µs | **1.50x** | 2.1176 GiB/s | 3.1716 GiB/s | +| Parquet 2.0 | 663.55 µs | 460.11 µs | **1.44x** | 3.0128 GiB/s | 4.3449 GiB/s | +| ZSTD | 1.8295 ms | 1.0698 ms | **1.71x** | 1.0927 GiB/s | 1.8687 GiB/s | +| ZSTD + Pq2 | 1.7526 ms | 985.74 µs | **1.78x** | 1.1407 GiB/s | 2.0281 GiB/s | + +### String And Binary View + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 223.64 µs | 204.02 µs | **1.10x** | 564.27 MiB/s | 618.52 MiB/s | +| Bloom Filter | 348.98 µs | 313.11 µs | **1.11x** | 361.60 MiB/s | 403.02 MiB/s | +| Parquet 2.0 | 249.41 µs | 231.77 µs | **1.08x** | 505.96 MiB/s | 544.47 MiB/s | +| ZSTD | 447.43 µs | 328.81 µs | **1.36x** | 282.03 MiB/s | 383.78 MiB/s | +| ZSTD + Pq2 | 419.45 µs | 321.44 µs | **1.30x** | 300.85 MiB/s | 392.57 MiB/s | + +### String Dictionary + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 248.46 µs | 287.86 µs | 0.86x | 4.0562 GiB/s | 3.5010 GiB/s | +| Bloom Filter | 429.93 µs | 465.79 µs | 0.92x | 2.3442 GiB/s | 2.1637 GiB/s | +| Parquet 2.0 | 251.90 µs | 258.72 µs | 0.97x | 4.0009 GiB/s | 3.8953 GiB/s | +| ZSTD | 720.15 µs | 756.77 µs | 0.95x | 1.3994 GiB/s | 1.3317 GiB/s | +| ZSTD + Pq2 | 694.32 µs | 736.75 µs | 0.94x | 1.4515 GiB/s | 1.3679 GiB/s | + +### Float With Nans + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 251.14 µs | 226.41 µs | **1.11x** | 218.85 MiB/s | 242.75 MiB/s | +| Bloom Filter | 303.02 µs | 292.82 µs | **1.03x** | 181.38 MiB/s | 187.70 MiB/s | +| Parquet 2.0 | 387.83 µs | 387.84 µs | 1.00x | 141.72 MiB/s | 141.71 MiB/s | +| ZSTD | 356.92 µs | 253.61 µs | **1.41x** | 153.99 MiB/s | 216.72 MiB/s | +| ZSTD + Pq2 | 471.65 µs | 395.83 µs | **1.19x** | 116.53 MiB/s | 138.85 MiB/s | + +### List Primitive + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 878.04 µs | 725.16 µs | **1.21x** | 2.3711 GiB/s | 2.8710 GiB/s | +| Bloom Filter | 1.0946 ms | 867.24 µs | **1.26x** | 1.9020 GiB/s | 2.4007 GiB/s | +| Parquet 2.0 | 919.72 µs | 661.81 µs | **1.39x** | 2.2637 GiB/s | 3.1458 GiB/s | +| ZSTD | 1.4311 ms | 1.3580 ms | **1.05x** | 1.4548 GiB/s | 1.5331 GiB/s | +| ZSTD + Pq2 | 1.5269 ms | 1.2940 ms | **1.18x** | 1.3635 GiB/s | 1.6090 GiB/s | + +### List Primitive Non Null + +| Writer Property | Arrow Time | parquet-ext Time | Speedup | Arrow Thrpt | parquet-ext Thrpt | +| :--- | ---: | ---: | :---: | ---: | ---: | +| Default | 949.77 µs | 709.77 µs | **1.34x** | 2.1873 GiB/s | 2.9270 GiB/s | +| Bloom Filter | 1.2179 ms | 917.86 µs | **1.33x** | 1.7058 GiB/s | 2.2634 GiB/s | +| Parquet 2.0 | 998.70 µs | 821.79 µs | **1.22x** | 2.0802 GiB/s | 2.5280 GiB/s | +| ZSTD | 2.0607 ms | 1.7650 ms | **1.17x** | 1.0081 GiB/s | 1.1771 GiB/s | +| ZSTD + Pq2 | 2.0530 ms | 1.7802 ms | **1.15x** | 1.0119 GiB/s | 1.1670 GiB/s | + +## Analysis + +- **parquet-ext wins 31/55** benchmark configurations (56%) +- **Best speedup**: 1.84x at `primitive/default` +- **Worst regression**: 0.33x at `bool_non_null/default` + +### Where parallel encoding helps most + +- **Multi-column schemas** (string, list_primitive) -- more columns to parallelize across +- **Compression-heavy configs** (ZSTD) -- CPU-bound work benefits from threading +- **Large string data** -- encoding + compression of string columns is expensive and parallelizable + +### Where sequential Arrow is faster + +- **Single-column schemas** (bool, bool_non_null) -- thread spawn overhead dominates tiny workloads +- **Dictionary-encoded data** -- already compact, little parallel work to distribute +- **Very small batches** -- 4,096 rows may not provide enough work to amortize threading overhead diff --git a/crates/parquet-ext/benches/arrow_writer.rs b/crates/parquet-ext/benches/arrow_writer.rs new file mode 100644 index 000000000..e10983889 --- /dev/null +++ b/crates/parquet-ext/benches/arrow_writer.rs @@ -0,0 +1,492 @@ +#[macro_use] +extern crate criterion; + +use std::{hint::black_box, io::Empty, sync::Arc}; + +use arrow::{ + datatypes::*, + record_batch::RecordBatch, + util::{ + bench_util::{create_f16_array, create_f32_array, create_f64_array}, + data_gen::*, + }, +}; +use arrow_array::RecordBatchOptions; +use criterion::{Bencher, BenchmarkId, Criterion, Throughput}; +use parquet::{ + arrow::{ + ArrowSchemaConverter, + arrow_writer::{ArrowRowGroupWriterFactory, compute_leaves}, + }, + basic::{Compression, ZstdLevel}, + errors::Result, + file::{ + properties::{WriterProperties, WriterVersion}, + writer::SerializedFileWriter, + }, +}; +use parquet_ext::writer::EncoderFactory; + +fn create_primitive_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new("_1", DataType::Int32, true), + Field::new("_2", DataType::Int64, true), + Field::new("_3", DataType::UInt32, true), + Field::new("_4", DataType::UInt64, true), + Field::new("_5", DataType::Float32, true), + Field::new("_6", DataType::Float64, true), + Field::new("_7", DataType::Date64, true), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_primitive_bench_batch_non_null( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new("_1", DataType::Int32, false), + Field::new("_2", DataType::Int64, false), + Field::new("_3", DataType::UInt32, false), + Field::new("_4", DataType::UInt64, false), + Field::new("_5", DataType::Float32, false), + Field::new("_6", DataType::Float64, false), + Field::new("_7", DataType::Date64, false), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_string_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new("_1", DataType::Utf8, true), + Field::new("_2", DataType::LargeUtf8, true), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_string_and_binary_view_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new("_1", DataType::Utf8View, true), + Field::new("_2", DataType::BinaryView, true), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_string_dictionary_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![Field::new( + "_1", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + )]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_string_bench_batch_non_null( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new("_1", DataType::Utf8, false), + Field::new("_2", DataType::LargeUtf8, false), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_bool_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![Field::new("_1", DataType::Boolean, true)]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_bool_bench_batch_non_null( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![Field::new("_1", DataType::Boolean, false)]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_float_bench_batch_with_nans(size: usize, nan_density: f32) -> Result { + let fields = vec![ + Field::new("_1", DataType::Float16, false), + Field::new("_2", DataType::Float32, false), + Field::new("_3", DataType::Float64, false), + ]; + let schema = Schema::new(fields); + let columns: Vec = vec![ + Arc::new(create_f16_array(size, nan_density)), + Arc::new(create_f32_array(size, nan_density)), + Arc::new(create_f64_array(size, nan_density)), + ]; + Ok(RecordBatch::try_new_with_options( + Arc::new(schema), + columns, + &RecordBatchOptions::new().with_match_field_names(false), + )?) +} + +fn create_list_primitive_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new( + "_1", + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), + true, + ), + Field::new( + "_2", + DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))), + true, + ), + Field::new( + "_3", + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + ), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn create_list_primitive_bench_batch_non_null( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new( + "_1", + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))), + false, + ), + Field::new( + "_2", + DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, false))), + false, + ), + Field::new( + "_3", + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, false))), + false, + ), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +fn _create_nested_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![ + Field::new( + "_1", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new( + "_2", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new( + "_1", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new("_2", DataType::Utf8, true), + ])), + true, + ), + Field::new("_2", DataType::UInt8, true), + ])), + true, + ), + ])), + true, + ), + Field::new( + "_2", + DataType::LargeList(Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![ + Field::new( + "_1", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new("_2", DataType::Int16, true), + Field::new("_3", DataType::Int32, true), + ])), + true, + ), + Field::new( + "_2", + DataType::List(Arc::new(Field::new( + "", + DataType::FixedSizeBinary(2), + true, + ))), + true, + ), + ])), + true, + ))), + true, + ))), + true, + ), + ]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + +/// Arrow's sequential column encoding (baseline). +fn encode_arrow_sequential( + bench: &mut Bencher, + batch: &RecordBatch, + props: Option, +) -> Result<()> { + let mut file = Empty::default(); + let props = Arc::new(props.unwrap_or_default()); + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(batch.schema_ref())?; + let writer = SerializedFileWriter::new(&mut file, parquet_schema.root_schema_ptr(), props)?; + let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, batch.schema()); + + bench.iter(|| { + let mut row_group = row_group_writer_factory.create_column_writers(0).unwrap(); + + let mut writers = row_group.iter_mut(); + for (field, column) in batch + .schema() + .fields() + .iter() + .zip(black_box(batch).columns()) + { + for leaf in compute_leaves(field.as_ref(), column).unwrap() { + writers.next().unwrap().write(&leaf).unwrap() + } + } + + for writer in row_group.into_iter() { + black_box(writer.close()).unwrap(); + } + }); + + Ok(()) +} + +/// parquet-ext's parallel column encoding. +fn encode_parquet_ext_parallel( + bench: &mut Bencher, + batch: &RecordBatch, + props: Option, + num_workers: usize, +) -> Result<()> { + let mut file = Empty::default(); + let props = Arc::new(props.unwrap_or_default()); + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(batch.schema_ref())?; + let writer = SerializedFileWriter::new(&mut file, parquet_schema.root_schema_ptr(), props)?; + let mut encoder_factory = EncoderFactory::new(&writer, &batch.schema()); + + bench.iter(|| { + let encoder = encoder_factory.try_next_encoder().unwrap(); + let encoded = encoder.encode(&[black_box(batch).clone()], None, num_workers); + black_box(encoded).unwrap(); + }); + + Ok(()) +} + +fn create_batches() -> Vec<(&'static str, RecordBatch)> { + const BATCH_SIZE: usize = 4096; + + let mut batches = vec![]; + + let batch = create_primitive_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("primitive", batch)); + + let batch = create_primitive_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("primitive_non_null", batch)); + + let batch = create_bool_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("bool", batch)); + + let batch = create_bool_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("bool_non_null", batch)); + + let batch = create_string_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("string", batch)); + + let batch = create_string_and_binary_view_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("string_and_binary_view", batch)); + + let batch = create_string_dictionary_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("string_dictionary", batch)); + + let batch = create_string_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("string_non_null", batch)); + + let batch = create_float_bench_batch_with_nans(BATCH_SIZE, 0.5).unwrap(); + batches.push(("float_with_nans", batch)); + + let batch = create_list_primitive_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("list_primitive", batch)); + + let batch = create_list_primitive_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap(); + batches.push(("list_primitive_non_null", batch)); + + batches +} + +fn create_writer_props() -> Vec<(&'static str, WriterProperties)> { + let mut props = vec![]; + + props.push(("default", Default::default())); + + let prop = WriterProperties::builder() + .set_bloom_filter_enabled(true) + .build(); + props.push(("bloom_filter", prop)); + + let prop = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .build(); + props.push(("parquet_2", prop)); + + let prop = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .build(); + props.push(("zstd", prop)); + + let prop = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_writer_version(WriterVersion::PARQUET_2_0) + .build(); + props.push(("zstd_parquet_2", prop)); + + props +} + +fn bench_all_writers(c: &mut Criterion) { + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + let batches = create_batches(); + let props = create_writer_props(); + + for (batch_name, batch) in &batches { + let mut group = c.benchmark_group(*batch_name); + group.throughput(Throughput::Bytes( + batch + .columns() + .iter() + .map(|f| f.get_array_memory_size() as u64) + .sum(), + )); + + for (prop_name, prop) in &props { + group.bench_function(BenchmarkId::new("arrow", *prop_name), |b| { + encode_arrow_sequential(b, batch, Some(prop.clone())).unwrap() + }); + + group.bench_function(BenchmarkId::new("parquet_ext", *prop_name), |b| { + encode_parquet_ext_parallel(b, batch, Some(prop.clone()), num_workers).unwrap() + }); + } + group.finish(); + } +} + +criterion_group!(benches, bench_all_writers); +criterion_main!(benches); diff --git a/crates/parquet-ext/benches/end_to_end_writer.md b/crates/parquet-ext/benches/end_to_end_writer.md new file mode 100644 index 000000000..5907d14c9 --- /dev/null +++ b/crates/parquet-ext/benches/end_to_end_writer.md @@ -0,0 +1,127 @@ +# Writer Benchmark: End-to-End Performance + +## A/B comparison of full async writer pipeline: parquet-ext vs Arrow's AsyncArrowWriter + +### Writer + +`tokio::io::sink()` (no I/O -- pure encode + pipeline overhead) + +### this_crate + +`parquet_ext::AsyncArrowWriter` (parallel pipeline with encoder threads) + +### parquet_crate** + +`parquet::arrow::AsyncArrowWriter` (sequential) + +### Schemas + +Both schema types cycle through columns of the types noted here. + +#### Simple + +- Int64 +- Utf8 + +#### Complex + +- Int64 +- Utf8 +- List\ +- List\ +- Struct + - Int64 + - Utf8 + - Float64 +- Boolean + +## Summary + +Speedup = parquet_crate time / this_crate time. Values **> 1.0x** mean parquet-ext is faster. + +### Complex Schema + +| Rows | uncompressed 10col | uncompressed 30col | zstd_1 10col | zstd_1 30col | zstd_3 10col | zstd_3 30col | +| ---: | ---: | ---: | ---: | ---: | ---: | ---: | +| 1K | 1.5x | **1.6x** | 1.5x | 1.4x | 0.99x | 1.4x | +| 100K | **3.2x** | **6.5x** | **3.3x** | **6.8x** | **3.3x** | **6.7x** | +| 1M | **3.6x** | **7.1x** | **3.8x** | **7.3x** | **3.9x** | **7.3x** | + +### Simple Schema + +| Rows | uncompressed 10col | uncompressed 30col | zstd_1 10col | zstd_1 30col | zstd_3 10col | zstd_3 30col | +| ---: | ---: | ---: | ---: | ---: | ---: | ---: | +| 1K | 0.48x | 0.83x | 0.58x | 1.0x | 0.71x | 1.1x | +| 100K | **4.1x** | **7.2x** | **4.6x** | **7.4x** | **4.8x** | **7.5x** | +| 1M | **5.1x** | **7.9x** | **4.7x** | **8.1x** | **4.2x** | **7.9x** | + +## Detailed Results + +### Complex 10 Columns + +| Rows | Compression | this_crate Time | parquet_crate Time | Speedup | this_crate Thrpt | parquet_crate Thrpt | +| ---: | :--- | ---: | ---: | :---: | ---: | ---: | +| 1K | uncompressed | 399.19 µs | 589.96 µs | **1.48x** | 738.93 MiB/s | 499.99 MiB/s | +| 1K | zstd_1 | 492.83 µs | 732.57 µs | **1.49x** | 598.53 MiB/s | 402.65 MiB/s | +| 1K | zstd_3 | 806.05 µs | 794.52 µs | 0.99x | 365.95 MiB/s | 371.26 MiB/s | +| 100K | uncompressed | 19.033 ms | 61.004 ms | **3.2x** | 1.4552 GiB/s | 464.92 MiB/s | +| 100K | zstd_1 | 19.092 ms | 63.854 ms | **3.3x** | 1.4507 GiB/s | 444.17 MiB/s | +| 100K | zstd_3 | 20.064 ms | 65.402 ms | **3.3x** | 1.3805 GiB/s | 433.65 MiB/s | +| 1M | uncompressed | 167.48 ms | 599.22 ms | **3.6x** | 1.6923 GiB/s | 484.34 MiB/s | +| 1M | zstd_1 | 159.77 ms | 609.54 ms | **3.8x** | 1.7739 GiB/s | 476.15 MiB/s | +| 1M | zstd_3 | 158.75 ms | 614.73 ms | **3.9x** | 1.7854 GiB/s | 472.12 MiB/s | + +### Complex 30 Columns + +| Rows | Compression | this_crate Time | parquet_crate Time | Speedup | this_crate Thrpt | parquet_crate Thrpt | +| ---: | :--- | ---: | ---: | :---: | ---: | ---: | +| 1K | uncompressed | 1.0632 ms | 1.6635 ms | **1.6x** | 776.66 MiB/s | 496.39 MiB/s | +| 1K | zstd_1 | 1.4728 ms | 2.0053 ms | **1.36x** | 560.67 MiB/s | 411.78 MiB/s | +| 1K | zstd_3 | 1.5397 ms | 2.0925 ms | **1.36x** | 536.31 MiB/s | 394.63 MiB/s | +| 100K | uncompressed | 25.175 ms | 163.46 ms | **6.5x** | 3.0950 GiB/s | 488.09 MiB/s | +| 100K | zstd_1 | 25.876 ms | 175.10 ms | **6.8x** | 3.0110 GiB/s | 455.65 MiB/s | +| 100K | zstd_3 | 26.765 ms | 179.90 ms | **6.7x** | 2.9111 GiB/s | 443.48 MiB/s | +| 1M | uncompressed | 218.98 ms | 1.5592 s | **7.1x** | 3.4914 GiB/s | 502.11 MiB/s | +| 1M | zstd_1 | 226.11 ms | 1.6420 s | **7.3x** | 3.3813 GiB/s | 476.79 MiB/s | +| 1M | zstd_3 | 229.85 ms | 1.6717 s | **7.3x** | 3.3263 GiB/s | 468.30 MiB/s | + +### Simple 10 Columns + +| Rows | Compression | this_crate Time | parquet_crate Time | Speedup | this_crate Thrpt | parquet_crate Thrpt | +| ---: | :--- | ---: | ---: | :---: | ---: | ---: | +| 1K | uncompressed | 492.25 µs | 236.56 µs | 0.48x | 197.09 MiB/s | 410.11 MiB/s | +| 1K | zstd_1 | 474.19 µs | 276.98 µs | 0.58x | 204.59 MiB/s | 350.26 MiB/s | +| 1K | zstd_3 | 442.75 µs | 312.96 µs | 0.71x | 219.12 MiB/s | 309.99 MiB/s | +| 100K | uncompressed | 3.0331 ms | 12.554 ms | **4.1x** | 3.1151 GiB/s | 770.67 MiB/s | +| 100K | zstd_1 | 2.7488 ms | 12.781 ms | **4.6x** | 3.4373 GiB/s | 756.97 MiB/s | +| 100K | zstd_3 | 2.7940 ms | 13.319 ms | **4.8x** | 3.3816 GiB/s | 726.43 MiB/s | +| 1M | uncompressed | 24.119 ms | 123.83 ms | **5.1x** | 3.9152 GiB/s | 780.91 MiB/s | +| 1M | zstd_1 | 27.880 ms | 131.82 ms | **4.7x** | 3.3870 GiB/s | 733.55 MiB/s | +| 1M | zstd_3 | 31.525 ms | 131.63 ms | **4.2x** | 2.9954 GiB/s | 734.60 MiB/s | + +### Simple 30 Columns + +| Rows | Compression | this_crate Time | parquet_crate Time | Speedup | this_crate Thrpt | parquet_crate Thrpt | +| ---: | :--- | ---: | ---: | :---: | ---: | ---: | +| 1K | uncompressed | 523.36 µs | 434.77 µs | 0.83x | 573.34 MiB/s | 690.17 MiB/s | +| 1K | zstd_1 | 594.32 µs | 618.11 µs | **1.04x** | 504.88 MiB/s | 485.45 MiB/s | +| 1K | zstd_3 | 596.38 µs | 632.48 µs | **1.06x** | 503.15 MiB/s | 474.42 MiB/s | +| 100K | uncompressed | 5.1542 ms | 37.361 ms | **7.2x** | 5.6666 GiB/s | 800.51 MiB/s | +| 100K | zstd_1 | 5.2387 ms | 38.710 ms | **7.4x** | 5.5751 GiB/s | 772.61 MiB/s | +| 100K | zstd_3 | 5.2400 ms | 39.072 ms | **7.5x** | 5.5738 GiB/s | 765.44 MiB/s | +| 1M | uncompressed | 46.878 ms | 371.34 ms | **7.9x** | 6.1466 GiB/s | 794.56 MiB/s | +| 1M | zstd_1 | 47.498 ms | 383.28 ms | **8.1x** | 6.0664 GiB/s | 769.81 MiB/s | +| 1M | zstd_3 | 49.541 ms | 390.82 ms | **7.9x** | 5.8162 GiB/s | 754.97 MiB/s | + +## Analysis + +- **parquet-ext wins 31/36** benchmark configurations (86%) +- **Best speedup**: 8.1x at `zstd_1/simple_30cols_1M` + +### Key observations + +- **Speedup scales with data size**: Small batches (1K rows) show modest gains or overhead; at 100K+ rows the parallel pipeline dominates +- **More columns = more parallelism**: 30-column schemas consistently outperform 10-column schemas in relative speedup +- **Compression amplifies gains**: ZSTD adds CPU work per column, making parallel encoding even more beneficial +- **Complex schemas benefit most**: Nested types (lists, structs) are more expensive to encode, providing more parallel work +- **At scale (1M rows, 30 cols, ZSTD)**: parquet-ext achieves ~6 GiB/s vs Arrow's ~770 MiB/s -- a **~8x speedup** diff --git a/crates/parquet-ext/benches/end_to_end_writer.rs b/crates/parquet-ext/benches/end_to_end_writer.rs new file mode 100644 index 000000000..1669de21a --- /dev/null +++ b/crates/parquet-ext/benches/end_to_end_writer.rs @@ -0,0 +1,578 @@ +use std::{marker, sync::Arc}; + +use arrow_array::{ + ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + StructArray, + builder::{ArrayBuilder, Int32Builder, ListBuilder, StringBuilder}, +}; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +#[cfg(feature = "brotli")] +use parquet::basic::BrotliLevel; +#[cfg(feature = "flate2")] +use parquet::basic::GzipLevel; +#[cfg(feature = "zstd")] +use parquet::basic::ZstdLevel; +use parquet::{ + arrow::AsyncArrowWriter as ParquetAsyncArrowWriter, + basic::Compression, + file::properties::{WriterProperties, WriterPropertiesPtr}, +}; +use parquet_ext::arrow::async_writer::AsyncArrowWriter as OurAsyncArrowWriter; +use rand::Rng as _; +use tokio::runtime::Runtime; + +enum Cardinality { + Low, + #[allow(dead_code)] + High, +} + +enum CardinalityGen { + Low(marker::PhantomData), + High(marker::PhantomData), +} + +impl From<&Cardinality> for CardinalityGen { + fn from(cardinality: &Cardinality) -> Self { + match cardinality { + Cardinality::Low => CardinalityGen::Low(marker::PhantomData), + Cardinality::High => CardinalityGen::High(marker::PhantomData), + } + } +} + +impl CardinalityGen { + fn gen_range(&self, rng: &mut impl rand::Rng) -> i64 { + match self { + CardinalityGen::Low(_) => rng.random_range(0..100), // Low cardinality: 100 unique values + CardinalityGen::High(_) => rng.random_range(0..1_000_000), // High cardinality: many unique values + } + } +} + +impl CardinalityGen { + fn gen_string(&self, rng: &mut impl rand::Rng) -> String { + match self { + CardinalityGen::Low(_) => { + // Low cardinality: choose from a small set of strings + let choices = ["apple", "banana", "cherry", "date", "elderberry"]; + choices[rng.random_range(0..choices.len())].to_string() + } + CardinalityGen::High(_) => { + // High cardinality: generate random strings + let len = rng.random_range(5..20); + (0..len) + .map(|_| rng.random_range(b'a'..=b'z') as char) + .collect() + } + } + } +} + +impl CardinalityGen { + fn gen_range(&self, rng: &mut impl rand::Rng) -> f64 { + match self { + CardinalityGen::Low(_) => rng.random_range(0.0..100.0), // Low cardinality + CardinalityGen::High(_) => rng.random_range(0.0..1_000_000.0), // High cardinality + } + } +} + +impl CardinalityGen { + fn gen_bool(&self, rng: &mut impl rand::Rng) -> bool { + match self { + CardinalityGen::Low(_) => rng.random_bool(0.9), // 90% chance + CardinalityGen::High(_) => rng.random_bool(0.5), // 50% chance + } + } +} + +impl CardinalityGen { + fn gen_range(&self, rng: &mut impl rand::Rng) -> i32 { + match self { + CardinalityGen::Low(_) => rng.random_range(0..100), // Low cardinality: 100 unique values + CardinalityGen::High(_) => rng.random_range(0..1_000_000), // High cardinality: many unique values + } + } +} + +#[allow(dead_code)] +enum SchemaKind { + /// Simple schema with the given number of columns. + Simple(usize), + /// Complex schema with the given number of columns including nested types. + Complex(usize), + /// Schema with all supported, non-nested types. + All, +} + +impl SchemaKind { + fn as_str(&self) -> &'static str { + match self { + SchemaKind::Simple(_) => "simple", + SchemaKind::Complex(_) => "complex", + SchemaKind::All => "all", + } + } + + #[allow(dead_code)] + fn generate_schema(&self) -> Arc { + match self { + SchemaKind::Simple(num_columns) => generate_simple_schema(*num_columns), + SchemaKind::Complex(num_columns) => generate_complex_schema(*num_columns), + SchemaKind::All => { + let fields: [Field; 30] = [ + Field::new("int64_col", DataType::Int64, false), + Field::new("int64_col_opt", DataType::Int64, true), + Field::new("int32_col", DataType::Int32, false), + Field::new("int32_col_opt", DataType::Int32, true), + Field::new("uint64_col", DataType::UInt64, false), + Field::new("uint64_col_opt", DataType::UInt64, true), + Field::new("uint32_col", DataType::UInt32, false), + Field::new("uint32_col_opt", DataType::UInt32, true), + Field::new("str_col", DataType::Utf8, false), + Field::new("str_col_opt", DataType::Utf8, true), + Field::new("bin_col", DataType::Binary, false), + Field::new("bin_col_opt", DataType::Binary, true), + Field::new("bin_20_col", DataType::FixedSizeBinary(20), false), + Field::new("bin_20_col_opt", DataType::FixedSizeBinary(20), true), + Field::new("float64_col", DataType::Float64, false), + Field::new("float64_col_opt", DataType::Float64, true), + Field::new("float32_col", DataType::Float32, false), + Field::new("float32_col_opt", DataType::Float32, true), + Field::new("bool_col", DataType::Boolean, false), + Field::new("bool_col_opt", DataType::Boolean, true), + Field::new( + "ts_millis_col", + DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None), + false, + ), + Field::new( + "ts_millis_col_tz", + DataType::Timestamp( + arrow_schema::TimeUnit::Millisecond, + Some("UTC".into()), + ), + false, + ), + Field::new( + "ts_millis_col_opt", + DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None), + true, + ), + Field::new( + "ts_micros_col", + DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None), + false, + ), + Field::new( + "ts_micros_col_tz", + DataType::Timestamp( + arrow_schema::TimeUnit::Microsecond, + Some("UTC".into()), + ), + false, + ), + Field::new( + "ts_micros_col_opt", + DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None), + true, + ), + Field::new("date64_col", DataType::Date64, false), + Field::new("date64_col_opt", DataType::Date64, true), + Field::new("date32_col", DataType::Date32, false), + Field::new("date32_col_opt", DataType::Date32, true), + ]; + Arc::new(Schema::new(fields.to_vec())) + } + } + } +} + +/// Compression configurations to benchmark. +fn compression_configs() -> Vec<(&'static str, Compression, usize)> { + vec![ + ("uncompressed", Compression::UNCOMPRESSED, 100), + #[cfg(feature = "snap")] + ("snappy", Compression::SNAPPY, 100), + #[cfg(feature = "flate2")] + ("gzip", Compression::GZIP(GzipLevel::default()), 10), + #[cfg(feature = "zstd")] + ("zstd_1", Compression::ZSTD(ZstdLevel::default()), 100), + #[cfg(feature = "zstd")] + ( + "zstd_3", + Compression::ZSTD(ZstdLevel::try_new(3).unwrap()), + 50, + ), + // #[cfg(feature = "zstd")] + // ("zstd_5", Compression::ZSTD(ZstdLevel::try_new(5).unwrap())), + #[cfg(feature = "brotli")] + ("brotli_1", Compression::BROTLI(BrotliLevel::default()), 100), + #[cfg(feature = "brotli")] + ( + "brotli_5", + Compression::BROTLI(BrotliLevel::try_new(5).unwrap()), + 10, + ), + #[cfg(feature = "lz4")] + ("lz4", Compression::LZ4_RAW, 100), + ] +} + +/// Generate a schema with the given number of columns. +/// Alternates between Int64 and String columns. +fn generate_simple_schema(num_columns: usize) -> Arc { + let fields: Vec = (0..num_columns) + .map(|i| { + if i % 2 == 0 { + Field::new(format!("col_{i}"), DataType::Int64, false) + } else { + Field::new(format!("col_{i}"), DataType::Utf8, false) + } + }) + .collect(); + Arc::new(Schema::new(fields)) +} + +/// Generate a complex schema with the given number of columns +/// including nested types (i.e. lists, structs) and simple types. +fn generate_complex_schema(num_columns: usize) -> Arc { + let mut fields = Vec::with_capacity(num_columns); + + for i in 0..num_columns { + let field = match i % 6 { + // Simple Int64 + 0 => Field::new(format!("int_col_{i}"), DataType::Int64, false), + // Simple String + 1 => Field::new(format!("str_col_{i}"), DataType::Utf8, true), + // List of Int32 + 2 => Field::new( + format!("list_int_col_{i}"), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), + true, + ), + // List of Strings + 3 => Field::new( + format!("list_str_col_{i}"), + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + ), + // Struct with Int64 and String fields + 4 => Field::new( + format!("struct_col_{i}"), + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ] + .into(), + ), + false, + ), + // Boolean + 5 => Field::new(format!("bool_col_{i}"), DataType::Boolean, false), + _ => unreachable!(), + }; + fields.push(field); + } + + Arc::new(Schema::new(fields)) +} + +/// Generate a random string of lowercase letters with the given length range. +fn generate_random_string( + rng: &mut impl rand::Rng, + min_len: usize, + max_len: usize, + cardinality: &CardinalityGen, +) -> String { + match cardinality { + CardinalityGen::Low(_) => cardinality.gen_string(rng), + CardinalityGen::High(_) => { + let len = rng.random_range(min_len..max_len); + (0..len) + .map(|_| rng.random_range(b'a'..=b'z') as char) + .collect() + } + } +} + +/// Generate a single batch with the given number of rows. +fn generate_batch( + schema: &Arc, + num_rows: usize, + cardinality: &Cardinality, + rng: &mut impl rand::Rng, + total_bytes: &mut usize, +) -> RecordBatch { + let columns: Vec = schema + .fields() + .iter() + .map(|field| generate_array_for_type(field.data_type(), cardinality, num_rows, rng)) + .collect(); + + let batch = RecordBatch::try_new(schema.clone(), columns).unwrap(); + let batch_bytes = batch.get_array_memory_size(); + + *total_bytes += batch_bytes; + + batch +} + +/// Generate an array of the given type with the specified number of rows. +fn generate_array_for_type( + data_type: &DataType, + cardinality: &Cardinality, + num_rows: usize, + rng: &mut impl rand::Rng, +) -> ArrayRef { + match data_type { + DataType::Int64 => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec = (0..num_rows).map(|_| card_gen.gen_range(rng)).collect(); + Arc::new(Int64Array::from(values)) as ArrayRef + } + DataType::Int32 => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec = (0..num_rows).map(|_| card_gen.gen_range(rng)).collect(); + Arc::new(Int32Array::from(values)) as ArrayRef + } + DataType::Float64 => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec = (0..num_rows).map(|_| card_gen.gen_range(rng)).collect(); + Arc::new(Float64Array::from(values)) as ArrayRef + } + DataType::Boolean => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec = (0..num_rows).map(|_| card_gen.gen_bool(rng)).collect(); + Arc::new(BooleanArray::from(values)) as ArrayRef + } + DataType::Utf8 => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec = (0..num_rows) + .map(|_| generate_random_string(rng, 5, 50, &card_gen)) + .collect(); + Arc::new(StringArray::from(values)) as ArrayRef + } + DataType::List(field) => { + // Generate variable-length lists (0-10 elements each) + + let mut list_builder = match field.data_type() { + DataType::Int32 => { + let value_builder = Int32Builder::new(); + Box::new(ListBuilder::new(value_builder)) as Box + } + DataType::Utf8 => { + let value_builder = StringBuilder::new(); + Box::new(ListBuilder::new(value_builder)) as Box + } + _ => panic!( + "Unsupported list value type in generate_array_for_type: {:?}", + field.data_type() + ), + }; + for _ in 0..num_rows { + if rng.random_bool(0.1) { + match field.data_type() { + DataType::Int32 => { + list_builder + .as_any_mut() + .downcast_mut::>() + .unwrap() + .append_null(); + } + DataType::Utf8 => { + list_builder + .as_any_mut() + .downcast_mut::>() + .unwrap() + .append_null(); + } + _ => panic!( + "Unsupported list value type in generate_array_for_type: {:?}", + field.data_type() + ), + } + } else { + let list_size = rng.random_range(1..=10) as usize; + match field.data_type() { + DataType::Int32 => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec> = (0..list_size) + .map(|_| card_gen.gen_range(rng)) + .map(Some) + .collect(); + list_builder + .as_any_mut() + .downcast_mut::>() + .unwrap() + .append_value(values); + } + DataType::Utf8 => { + let card_gen = CardinalityGen::::from(cardinality); + let values: Vec> = (0..list_size) + .map(|_| generate_random_string(rng, 5, 50, &card_gen)) + .map(Some) + .collect(); + list_builder + .as_any_mut() + .downcast_mut::>() + .unwrap() + .append_value(values); + } + _ => panic!( + "Unsupported list value type in generate_array_for_type: {:?}", + field.data_type() + ), + }; + } + } + + Arc::new(list_builder.finish()) as ArrayRef + } + DataType::Struct(fields) => { + let arrays: Vec = fields + .iter() + .map(|f| generate_array_for_type(f.data_type(), cardinality, num_rows, rng)) + .collect(); + + Arc::new(StructArray::new(fields.clone(), arrays, None)) as ArrayRef + } + _ => panic!( + "Unsupported data type in generate_array_for_type: {:?}", + data_type + ), + } +} + +/// Generate multiple batches of random lengths that sum to total_rows. +fn generate_batches( + schema: &Arc, + num_rows: usize, + cardinality: &Cardinality, +) -> (Vec, usize) { + let mut rng = rand::rng(); + let mut batches = Vec::new(); + let mut total_bytes = 0; + let mut remaining = num_rows; + + while remaining > 0 { + // Random batch size between 100 and 10000, capped at remaining + let max_batch = 10_000.min(remaining); + let min_batch = 100.min(max_batch); + let batch_size = if min_batch == max_batch { + min_batch + } else { + rng.random_range(min_batch..=max_batch) + }; + batches.push(generate_batch( + schema, + batch_size, + cardinality, + &mut rng, + &mut total_bytes, + )); + remaining -= batch_size; + } + + (batches, total_bytes) +} + +async fn bench_parquet_writer(batches: &[RecordBatch], props: &WriterProperties) { + let schema = batches[0].schema(); + let mut writer = + ParquetAsyncArrowWriter::try_new(tokio::io::sink(), schema, Some(props.clone())).unwrap(); + for batch in batches { + writer.write(batch).await.unwrap(); + } + writer.close().await.unwrap(); +} + +async fn bench_our_writer(batches: &[RecordBatch], props: &WriterProperties) { + let schema = batches[0].schema(); + let mut writer = + OurAsyncArrowWriter::try_new(tokio::io::sink(), schema, Some(props.clone())).unwrap(); + for batch in batches { + writer.write(batch).await.unwrap(); + } + writer.close().await.unwrap(); +} + +struct BenchInput<'a> { + batches: &'a [RecordBatch], + props: WriterPropertiesPtr, +} + +fn writer_benchmarks(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let columns = [10, 30]; + let rows = [(1_000, 1_000), (100_000, 64_000), (1_000_000, 100_000)]; + + let compressions = compression_configs(); + println!( + "Benchmarking with compressions: {:?}", + compressions + .iter() + .map(|(name, _, _)| *name) + .collect::>() + ); + + let mut group = c.benchmark_group("parquet_writer"); + + for &num_cols in &columns { + for schema in [SchemaKind::Complex(num_cols), SchemaKind::Simple(num_cols)] { + let schema_name = schema.as_str(); + let schema = schema.generate_schema(); + for &(num_rows, max_row_group_size) in &rows { + let (batches, total_bytes) = generate_batches(&schema, num_rows, &Cardinality::Low); + + for (compression_name, compression, n) in &compressions { + let param = + format!("{compression_name}/{schema_name}_{num_cols}cols_{num_rows}rows"); + let props = WriterProperties::builder() + .set_compression(*compression) + .set_max_row_group_size(max_row_group_size) + .build(); + + let input = BenchInput { + batches: &batches, + props: Arc::new(props), + }; + + group + .throughput(Throughput::Bytes(total_bytes as u64)) + .sample_size(*n) + .bench_with_input( + BenchmarkId::new("this_crate", ¶m), + &input, + |b, input| { + b.to_async(&rt) + .iter(|| bench_our_writer(input.batches, &input.props)); + }, + ); + + group + .throughput(Throughput::Bytes(total_bytes as u64)) + .sample_size(*n) + .bench_with_input( + BenchmarkId::new("parquet_crate", ¶m), + &input, + |b, input| { + b.to_async(&rt) + .iter(|| bench_parquet_writer(input.batches, &input.props)); + }, + ); + } + } + } + } + + group.finish(); +} + +criterion_group!(benches, writer_benchmarks); +criterion_main!(benches); diff --git a/crates/parquet-ext/src/arrow/arrow_reader/mod.rs b/crates/parquet-ext/src/arrow/arrow_reader/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/parquet-ext/src/arrow/arrow_reader/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/parquet-ext/src/arrow/arrow_writer/mod.rs b/crates/parquet-ext/src/arrow/arrow_writer/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/parquet-ext/src/arrow/arrow_writer/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/parquet-ext/src/arrow/async_reader/mod.rs b/crates/parquet-ext/src/arrow/async_reader/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/parquet-ext/src/arrow/async_reader/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/parquet-ext/src/arrow/async_writer/mod.rs b/crates/parquet-ext/src/arrow/async_writer/mod.rs new file mode 100644 index 000000000..b2d59f6c8 --- /dev/null +++ b/crates/parquet-ext/src/arrow/async_writer/mod.rs @@ -0,0 +1,105 @@ +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use parquet::{ + arrow::{arrow_writer::ArrowWriterOptions, async_writer::AsyncFileWriter}, + errors::Result, + file::{ + metadata::{KeyValue, ParquetMetaData, RowGroupMetaDataPtr}, + properties::WriterProperties, + }, +}; + +use crate::writer::Pipeline; + +pub struct AsyncArrowWriter { + pipeline: Pipeline, W>, + _marker: std::marker::PhantomData, +} + +impl AsyncArrowWriter { + pub fn try_new( + writer: W, + arrow_schema: SchemaRef, + props: Option, + ) -> Result { + Ok(Self { + pipeline: Pipeline::, W>::builder() + .build_properties(&arrow_schema, props)? + .build_writer(Vec::new())? + .with_progress() + .into_factory() + .spawn_encoder() + .spawn_writer(writer) + .build(), + _marker: std::marker::PhantomData, + }) + } + + /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`] + /// + /// This is unsafe because the options are not used to configure the writer in any way. + /// This function exists for API compatibility with parquet::arrow::AsyncArrowWriter, + /// but the options are ignored and will not have any effect on the writer's behavior. + /// + /// Use [`try_new`](Self::try_new) or [`try_new_with_shared_backend`](Self::try_new_with_shared_backend) instead, + /// which will properly pass the provided configuration options to the writer as well as + /// map the Arrow schema into the writer properties metadata and ensure the writer is configured + /// to coerce types as needed. + /// + /// # Safety + /// The provided options are ignored and will not have any effect on the writer's behavior. + pub unsafe fn try_new_with_options( + writer: W, + arrow_schema: SchemaRef, + _options: ArrowWriterOptions, + ) -> Result { + Self::try_new(writer, arrow_schema, None) + } + + /// Write a RecordBatch to the writer. + pub fn write(&mut self, batch: &RecordBatch) -> impl Future> { + self.pipeline.add_batch_async(batch.to_owned()) + } + + /// Flush buffered data to a new row group. + /// + /// Forces any buffered batches to be encoded and written, + /// even if the row group size limit hasn't been reached. + pub async fn flush(&mut self) -> Result<()> { + self.pipeline.flush_async().await + } + + /// Close the writer and return file metadata. + pub async fn close(self) -> Result { + self.pipeline.close_async().await + } + + pub fn append_key_value_metadata(&mut self, kv: KeyValue) { + self.pipeline.append_key_value_metadata(kv); + } + + /// Estimated memory usage, in bytes, of this `ArrowWriter` + pub fn memory_size(&self) -> usize { + self.pipeline.memory_size() + } + + /// Anticipated encoded size of the in progress row group. + pub fn in_progress_size(&self) -> usize { + self.pipeline.in_progress_size() + } + + /// Returns the number of rows buffered in the in progress row group + pub fn in_progress_rows(&self) -> usize { + self.pipeline.in_progress_rows() + } + + /// Returns the number of bytes written by this instance + pub fn bytes_written(&self) -> usize { + self.pipeline.bytes_written() + } + + /// Returns metadata for all flushed row groups. + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + self.pipeline.flushed_row_groups() + } +} diff --git a/crates/parquet-ext/src/arrow/mod.rs b/crates/parquet-ext/src/arrow/mod.rs new file mode 100644 index 000000000..3d2d0cf14 --- /dev/null +++ b/crates/parquet-ext/src/arrow/mod.rs @@ -0,0 +1,7 @@ +pub mod arrow_reader; +pub mod arrow_writer; + +#[cfg(feature = "async")] +pub mod async_reader; +#[cfg(feature = "async")] +pub mod async_writer; diff --git a/crates/parquet-ext/src/backend/mod.rs b/crates/parquet-ext/src/backend/mod.rs new file mode 100644 index 000000000..070588c0c --- /dev/null +++ b/crates/parquet-ext/src/backend/mod.rs @@ -0,0 +1,31 @@ +use std::thread::JoinHandle; + +use tokio::sync::oneshot::Sender; + +pub struct PipelineBackend { + join_handle: Option>, + signal: Option>, +} + +impl PipelineBackend { + pub fn new(join_handle: JoinHandle<()>, signal: Sender<()>) -> Option { + Some(Self { + join_handle: Some(join_handle), + signal: Some(signal), + }) + } + pub fn shutdown(&mut self) { + if let Some(signal) = self.signal.take() { + let _ = signal.send(()); + } + if let Some(handle) = self.join_handle.take() { + let _ = handle.join(); + } + } +} + +impl Drop for PipelineBackend { + fn drop(&mut self) { + self.shutdown(); + } +} diff --git a/crates/parquet-ext/src/builder/mod.rs b/crates/parquet-ext/src/builder/mod.rs new file mode 100644 index 000000000..cd91bad19 --- /dev/null +++ b/crates/parquet-ext/src/builder/mod.rs @@ -0,0 +1,7 @@ +pub trait BuildState {} + +pub enum Set {} +pub enum Unset {} + +impl BuildState for Set {} +impl BuildState for Unset {} diff --git a/crates/parquet-ext/src/lib.rs b/crates/parquet-ext/src/lib.rs new file mode 100644 index 000000000..07c5d6d9b --- /dev/null +++ b/crates/parquet-ext/src/lib.rs @@ -0,0 +1,6 @@ +pub mod arrow; + +mod backend; +mod builder; + +pub mod writer; diff --git a/crates/parquet-ext/src/writer/mod.rs b/crates/parquet-ext/src/writer/mod.rs new file mode 100644 index 000000000..12fc746fb --- /dev/null +++ b/crates/parquet-ext/src/writer/mod.rs @@ -0,0 +1,3 @@ +mod pipeline; + +pub use pipeline::{EncoderFactory, Pipeline, PipelineProperties}; diff --git a/crates/parquet-ext/src/writer/pipeline/backend.rs b/crates/parquet-ext/src/writer/pipeline/backend.rs new file mode 100644 index 000000000..bb767386c --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/backend.rs @@ -0,0 +1,42 @@ +use tokio::sync::oneshot::channel; + +use super::Progress; +use crate::{ + backend::PipelineBackend, + writer::pipeline::{ + encoder::{Encoder, EncoderExecutor, EncoderFactory}, + job::WriterInbox, + properties::PipelineProperties, + }, +}; + +impl PipelineBackend { + pub fn spawn_encoder( + properties: &PipelineProperties, + factory: EncoderFactory, + progress: Option<&Progress>, + ) -> (EncoderExecutor, WriterInbox) { + let num_workers = properties.num_workers; + let max_rows = properties.writer_properties.max_row_group_size(); + + let (writer_outbox, encoder_inbox) = properties.create_encoder_channel(); + let (encoder_outbox, writer_inbox) = properties.create_writer_channel(); + let (signal, shutdown_receiver) = channel(); + + let join_handle = std::thread::spawn(move || { + Encoder::new(encoder_inbox, num_workers, shutdown_receiver).run() + }); + + let backend = PipelineBackend::new(join_handle, signal); + + let encoder_executor = EncoderExecutor::new( + backend, + factory, + max_rows, + progress.cloned(), + writer_outbox, + encoder_outbox, + ); + (encoder_executor, writer_inbox) + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/encoder/batches.rs b/crates/parquet-ext/src/writer/pipeline/encoder/batches.rs new file mode 100644 index 000000000..c5bc1860d --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/encoder/batches.rs @@ -0,0 +1,46 @@ +use arrow_array::RecordBatch; + +use crate::writer::pipeline::Progress; + +pub struct Batches { + batches: Vec, + pub rows: usize, + pub bytes: usize, + pub max_rows: usize, +} + +impl Batches { + pub fn new(max_rows: usize) -> Self { + Batches { + batches: Vec::new(), + rows: 0, + bytes: 0, + max_rows, + } + } + + pub fn add_batch(&mut self, batch: RecordBatch, progress: Option<&Progress>) { + let batch_bytes = batch.get_array_memory_size(); + let batch_rows = batch.num_rows(); + self.batches.push(batch); + self.rows += batch_rows; + self.bytes += batch_bytes; + if let Some(progress) = progress { + progress.add_buffered(batch_rows, batch_bytes); + } + } + + pub fn is_empty(&self) -> bool { + self.batches.is_empty() + } + + pub fn take(&mut self, progress: Option<&Progress>) -> (Vec, usize) { + if let Some(progress) = progress { + progress.move_to_encoding(self.rows, self.bytes); + } + let rows = self.rows; + self.rows = 0; + self.bytes = 0; + (std::mem::take(&mut self.batches), rows) + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/encoder/executor.rs b/crates/parquet-ext/src/writer/pipeline/encoder/executor.rs new file mode 100644 index 000000000..c99f59aec --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/encoder/executor.rs @@ -0,0 +1,164 @@ +use std::mem::ManuallyDrop; + +use arrow_array::RecordBatch; +use futures::future::BoxFuture; +use parquet::errors::{ParquetError, Result}; + +use super::{Batches, factory::EncoderFactory}; +use crate::{ + backend::PipelineBackend, + writer::pipeline::{EncoderOutbox, Progress, WriterOutbox, job::EncodeJob}, +}; + +pub struct EncoderExecutor { + backend: Option, + batches: Batches, + factory: EncoderFactory, + progress: Option, + writer_outbox: ManuallyDrop, + encoder_outbox: ManuallyDrop, +} + +impl Drop for EncoderExecutor { + fn drop(&mut self) { + // SAFETY: Channel senders must be dropped before joining the encoder + // thread. The encoder thread blocks on `inbox.recv()`, which only + // returns `Err` once all senders are dropped. Joining first would + // deadlock. These fields are not accessed after this point. + unsafe { + ManuallyDrop::drop(&mut self.writer_outbox); + ManuallyDrop::drop(&mut self.encoder_outbox); + } + if let Some(mut backend) = self.backend.take() { + backend.shutdown(); + } + } +} + +impl EncoderExecutor { + pub fn new( + backend: Option, + factory: EncoderFactory, + max_rows: usize, + progress: Option, + writer_outbox: WriterOutbox, + encoder_outbox: EncoderOutbox, + ) -> Self { + EncoderExecutor { + backend, + batches: Batches::new(max_rows), + factory, + progress, + writer_outbox: ManuallyDrop::new(writer_outbox), + encoder_outbox: ManuallyDrop::new(encoder_outbox), + } + } + + fn rows(&self) -> usize { + self.batches.rows + } + + fn max_rows(&self) -> usize { + self.batches.max_rows + } + + fn headroom(&self) -> usize { + self.max_rows() - self.rows() + } + + fn take_batches(&mut self) -> (Vec, usize) { + self.batches.take(self.progress.as_ref()) + } + + pub fn handle_batch_async( + &mut self, + batch: RecordBatch, + ) -> BoxFuture<'_, Result<(), ParquetError>> { + let batch_rows = batch.num_rows(); + if batch_rows == 0 { + return Box::pin(async { Ok(()) }); + } + + Box::pin(async move { + if self.rows() + batch_rows > self.max_rows() { + let headroom = self.headroom(); + if headroom > 0 { + let batch_a = batch.slice(0, headroom); + let batch_b = batch.slice(headroom, batch_rows - headroom); + self.handle_batch_async(batch_a).await?; + self.handle_batch_async(batch_b).await?; + } else { + self.flush_async(false).await?; + self.handle_batch_async(batch).await?; + } + return Ok(()); + } + + self.batches.add_batch(batch, self.progress.as_ref()); + + if self.rows() >= self.max_rows() { + self.flush_async(false).await?; + } + + Ok(()) + }) + } + + pub async fn flush_async(&mut self, finalize: bool) -> Result<()> { + if self.batches.is_empty() && !finalize { + return Ok(()); + } + + let encoder = self.factory.try_next_encoder()?; + + let (batches, rows) = self.take_batches(); + let reply_tx = ManuallyDrop::into_inner(self.encoder_outbox.clone()); + + let job = EncodeJob { + encoder, + batches, + rows, + finalize, + reply_tx, + progress: self.progress.clone(), + }; + + self.writer_outbox + .send_async(job) + .await + .map_err(|e| ParquetError::General(e.to_string()))?; + + Ok(()) + } + + #[allow(unused)] + pub fn flush(&mut self, finalize: bool) -> Result<(), ParquetError> { + if self.batches.is_empty() && !finalize { + return Ok(()); + } + + let encoder = self.factory.try_next_encoder()?; + + let (batches, rows) = self.take_batches(); + let reply_tx = ManuallyDrop::into_inner(self.encoder_outbox.clone()); + + let job = EncodeJob { + encoder, + batches, + rows, + finalize, + reply_tx, + progress: self.progress.clone(), + }; + + self.writer_outbox + .send(job) + .map_err(|e| ParquetError::General(e.to_string()))?; + + Ok(()) + } + + pub fn shutdown(self) { + drop(self); + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/encoder/factory.rs b/crates/parquet-ext/src/writer/pipeline/encoder/factory.rs new file mode 100644 index 000000000..9bd3cb497 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/encoder/factory.rs @@ -0,0 +1,35 @@ +use std::{io::Write, sync::Arc}; + +use arrow_schema::SchemaRef; +use parquet::{ + arrow::arrow_writer::ArrowRowGroupWriterFactory, errors::ParquetError, + file::writer::SerializedFileWriter, +}; + +use super::RowGroupEncoder; + +pub struct EncoderFactory { + row_group_index: usize, + row_group_writer_factory: ArrowRowGroupWriterFactory, +} + +impl EncoderFactory { + pub fn new( + file_writer: &SerializedFileWriter, + arrow_schema: &SchemaRef, + ) -> Self { + let row_group_index = 0; + let arrow_schema = Arc::clone(arrow_schema); + let row_group_writer_factory = ArrowRowGroupWriterFactory::new(file_writer, arrow_schema); + + EncoderFactory { + row_group_index, + row_group_writer_factory, + } + } + + pub fn try_next_encoder(&mut self) -> Result { + RowGroupEncoder::try_new(self.row_group_index, &self.row_group_writer_factory) + .inspect(|_| self.row_group_index += 1) + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/encoder/mod.rs b/crates/parquet-ext/src/writer/pipeline/encoder/mod.rs new file mode 100644 index 000000000..7e2f973a0 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/encoder/mod.rs @@ -0,0 +1,334 @@ +use std::{ + collections::BTreeMap, + sync::atomic::{AtomicUsize, Ordering}, + thread::scope, +}; + +use arrow_array::RecordBatch; +use parking_lot::Mutex; +use parquet::{ + arrow::arrow_writer::{ + ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, + compute_leaves, + }, + errors::ParquetError, +}; +use tokio::sync::oneshot::Receiver; + +mod batches; +mod executor; +mod factory; +mod row_group; + +pub use self::{ + batches::Batches, + executor::EncoderExecutor, + factory::EncoderFactory, + row_group::{EncodedRowGroup, PartialEncodedRowGroup}, +}; +use crate::writer::pipeline::{ + job::{EncoderInbox, WriteJob}, + progress::Progress, +}; + +pub struct Encoder { + inbox: EncoderInbox, + num_workers: usize, + shutdown_receiver: Receiver<()>, +} + +impl Encoder { + pub fn new(inbox: EncoderInbox, num_workers: usize, shutdown_receiver: Receiver<()>) -> Self { + Encoder { + inbox, + num_workers, + shutdown_receiver, + } + } + + pub fn run(self) { + while let Ok(job) = self.inbox.recv() + && self.shutdown_receiver.is_empty() + { + let id = job.encoder.id(); + let rows = job.rows; + + let encode_result = + job.encoder + .encode(&job.batches, job.progress.as_ref(), self.num_workers); + + let result = match encode_result { + Ok(encoded) => encoded.into_write_job(rows, job.finalize), + Err(error) => WriteJob::Error { id, error }, + }; + + // Use let _ to silently handle disconnected writers. + // In shared mode, one writer dropping must not kill the encoder. + let _ = job.reply_tx.send(result); + } + } +} + +pub struct RowGroupEncoder { + row_group_index: usize, + row_group_writers: Vec, +} + +impl RowGroupEncoder { + fn try_new( + row_group_index: usize, + row_group_writer_factory: &ArrowRowGroupWriterFactory, + ) -> Result { + let writers = row_group_writer_factory.create_column_writers(row_group_index)?; + + Ok(RowGroupEncoder { + row_group_index, + row_group_writers: writers, + }) + } + + pub fn id(&self) -> usize { + self.row_group_index + } + + pub fn encode( + self, + batches: &[RecordBatch], + progress: Option<&Progress>, + num_workers: usize, + ) -> Result { + encode_column_chunks_parallel( + self.row_group_index, + batches, + self.row_group_writers, + progress, + num_workers, + ) + } +} + +type ThreadOutput = Vec<(usize, ArrowColumnChunk, isize)>; +type ThreadResult = Result; + +type IndexedLeaves = BTreeMap>>; +type IndexedLeavesResult = Result; + +pub fn encode_column_chunks_parallel( + index: usize, + batches: &[RecordBatch], + writers: Vec, + progress: Option<&Progress>, + num_workers: usize, +) -> Result { + let writers_len = writers.len(); + + if batches.is_empty() { + return Ok(PartialEncodedRowGroup::empty(index)); + } + + let leaf_columns = collect_leaf_columns(batches, writers_len, num_workers)?; + + let num_workers = num_workers.min(writers_len); + + let work_items: Mutex)>> = Mutex::new( + writers + .into_iter() + .zip(leaf_columns) + .enumerate() + .map(|(i, (w, l))| (i, w, l)) + .collect(), + ); + + let thread_results: Vec = scope(|s| { + let handles: Vec<_> = (0..num_workers) + .map(|_| { + let work_items = &work_items; + s.spawn(move || { + let mut local_results = Vec::new(); + loop { + let item = work_items.lock().pop(); + let Some((idx, writer, leaves)) = item else { + break; + }; + let result = encode_column(writer, leaves, progress)?; + local_results.push((idx, result.0, result.1)); + } + Ok(local_results) + }) + }) + .collect(); + + handles + .into_iter() + .map(|h| match h.join() { + Ok(result) => result, + Err(_) => Err(ParquetError::General( + "Column encoding thread panicked".to_string(), + )), + }) + .collect() + }); + + let mut encoded_cols: Vec> = (0..writers_len).map(|_| None).collect(); + let mut encoded_bytes = 0isize; + + for thread_result in thread_results { + for (idx, chunk, bytes) in thread_result? { + encoded_cols[idx] = Some(chunk); + encoded_bytes += bytes; + } + } + + let encoded_cols: Vec = + encoded_cols.into_iter().map(|slot| slot.unwrap()).collect(); + + Ok(PartialEncodedRowGroup { + id: index, + encoded_cols, + encoded_bytes, + }) +} + +/// Encodes a single column's leaf data. +fn encode_column( + mut writer: ArrowColumnWriter, + leaves: Vec, + progress: Option<&Progress>, +) -> Result<(ArrowColumnChunk, isize), ParquetError> { + let mut prev_bytes = writer.get_estimated_total_bytes() as isize; + let mut total_bytes = prev_bytes; + + if let Some(progress) = progress { + progress.update_encoded_bytes(prev_bytes); + } + + for leaf in leaves { + writer.write(&leaf)?; + let current_bytes = writer.get_estimated_total_bytes() as isize; + let delta = current_bytes - prev_bytes; + if let Some(progress) = progress { + progress.update_encoded_bytes(delta); + } + total_bytes += delta; + prev_bytes = current_bytes; + } + + let chunk = writer.close()?; + Ok((chunk, total_bytes)) +} + +fn collect_leaf_columns( + batches: &[RecordBatch], + writers_len: usize, + num_workers: usize, +) -> Result>, ParquetError> { + if batches.is_empty() { + return Ok(Vec::new()); + } + + if batches.len() == 1 || num_workers == 1 { + return compute_batch_leaves(&batches[0], writers_len).map(|mut result| { + for batch in &batches[1..] { + let leaves = compute_batch_leaves(batch, writers_len).unwrap(); + merge_batch_leaves(&mut result, leaves).unwrap(); + } + result + }); + } + + let num_workers = num_workers.min(batches.len()); + let next_idx = AtomicUsize::new(0); + + // Each thread returns a BTreeMap so results are ordered by index. + let thread_results: Vec = std::thread::scope(|s| { + let handles: Vec<_> = (0..num_workers) + .map(|_| { + let next_idx = &next_idx; + s.spawn(move || { + let mut local = BTreeMap::new(); + loop { + let idx = next_idx.fetch_add(1, Ordering::Relaxed); + if idx >= batches.len() { + break; + } + let leaves = compute_batch_leaves(&batches[idx], writers_len)?; + local.insert(idx, leaves); + } + Ok(local) + }) + }) + .collect(); + + handles + .into_iter() + .map(|h| match h.join() { + Ok(result) => result, + Err(_) => Err(ParquetError::General( + "Leaf column collection thread panicked".to_string(), + )), + }) + .collect() + }); + + // Merge thread-local BTreeMaps into one, preserving batch index order. + let mut indexed_leaves = BTreeMap::new(); + for thread_result in thread_results { + indexed_leaves.extend(thread_result?); + } + + let mut combined: Option>> = None; + for (_, leaves) in indexed_leaves { + match &mut combined { + Some(acc) => merge_batch_leaves(acc, leaves)?, + None => combined = Some(leaves), + } + } + + Ok(combined.unwrap_or_default()) +} + +fn compute_batch_leaves( + batch: &RecordBatch, + writers_len: usize, +) -> Result>, ParquetError> { + let mut cols: Vec> = Vec::with_capacity(writers_len); + let schema = batch.schema(); + + for (col_idx, field) in schema.fields().iter().enumerate() { + let array = batch.column(col_idx); + let leaves = compute_leaves(field, array)?; + + for leaf in leaves { + cols.push(vec![leaf]); + } + } + + if cols.len() != writers_len { + return Err(ParquetError::General(format!( + "Mismatched leaf column counts: {} (expected) vs {} (actual)", + writers_len, + cols.len(), + ))); + } + + Ok(cols) +} + +fn merge_batch_leaves( + result: &mut [Vec], + leaf_batch: Vec>, +) -> Result<(), ParquetError> { + if result.len() != leaf_batch.len() { + return Err(ParquetError::General(format!( + "Mismatched leaf column counts: {} (expected) vs {} (actual)", + result.len(), + leaf_batch.len() + ))); + } + + for (i, leaf) in leaf_batch.into_iter().enumerate() { + result[i].extend(leaf); + } + + Ok(()) +} diff --git a/crates/parquet-ext/src/writer/pipeline/encoder/row_group.rs b/crates/parquet-ext/src/writer/pipeline/encoder/row_group.rs new file mode 100644 index 000000000..84fa34d54 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/encoder/row_group.rs @@ -0,0 +1,66 @@ +use std::cmp::Ordering; + +use parquet::arrow::arrow_writer::ArrowColumnChunk; + +use crate::writer::pipeline::WriteJob; + +pub struct EncodedRowGroup { + pub id: usize, + pub encoded_cols: Vec, + pub encoded_bytes: isize, + pub encoded_rows: usize, +} + +pub struct PartialEncodedRowGroup { + pub id: usize, + pub encoded_cols: Vec, + pub encoded_bytes: isize, +} + +impl PartialEncodedRowGroup { + pub fn empty(id: usize) -> Self { + Self { + id, + encoded_cols: Vec::new(), + encoded_bytes: 0, + } + } + + fn into_encoded_row_group(self, rows: usize) -> EncodedRowGroup { + EncodedRowGroup { + id: self.id, + encoded_cols: self.encoded_cols, + encoded_bytes: self.encoded_bytes, + encoded_rows: rows, + } + } + + pub fn into_write_job(self, rows: usize, finalize: bool) -> WriteJob { + let row_group = self.into_encoded_row_group(rows); + if finalize { + WriteJob::Finalize { row_group } + } else { + WriteJob::Encoded { row_group } + } + } +} + +impl Eq for PartialEncodedRowGroup {} + +impl Ord for PartialEncodedRowGroup { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} + +impl PartialEq for PartialEncodedRowGroup { + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl PartialOrd for PartialEncodedRowGroup { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/factory/builder.rs b/crates/parquet-ext/src/writer/pipeline/factory/builder.rs new file mode 100644 index 000000000..4442a9f68 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/factory/builder.rs @@ -0,0 +1,159 @@ +use std::{io::Write, marker::PhantomData, mem::MaybeUninit, sync::Arc}; + +use arrow_schema::SchemaRef; +use bytes::Bytes; +use parquet::{ + arrow::async_writer::AsyncFileWriter, + errors::Result, + file::{properties::WriterProperties, writer::SerializedFileWriter}, +}; + +use crate::{ + builder::{BuildState, Set, Unset}, + writer::pipeline::{PipelineProperties, Progress}, +}; + +pub struct PipelineFactoryBuilder< + Buf: Default + Write + Send, + Prog: BuildState = Unset, + Props: BuildState = Unset, + Writer: BuildState = Unset, +> where + Bytes: From, +{ + pub(super) progress: MaybeUninit>, + pub(super) properties: MaybeUninit, + pub(super) file_writer: MaybeUninit>, + pub(super) _writer_state: PhantomData, + pub(super) _progress_state: PhantomData, + pub(super) _properties_state: PhantomData, +} + +impl Default for PipelineFactoryBuilder +where + Bytes: From, +{ + fn default() -> Self { + Self { + progress: MaybeUninit::uninit(), + properties: MaybeUninit::uninit(), + file_writer: MaybeUninit::uninit(), + _writer_state: PhantomData, + _progress_state: PhantomData, + _properties_state: PhantomData, + } + } +} + +impl + PipelineFactoryBuilder +where + Bytes: From, +{ + pub fn with_progress(mut self) -> PipelineFactoryBuilder { + self.progress.write(Some(Progress::new())); + PipelineFactoryBuilder { + progress: self.progress, + properties: self.properties, + file_writer: self.file_writer, + _writer_state: PhantomData, + _progress_state: PhantomData, + _properties_state: PhantomData, + } + } + + pub fn without_progress(mut self) -> PipelineFactoryBuilder { + self.progress.write(None); + PipelineFactoryBuilder { + progress: self.progress, + properties: self.properties, + file_writer: self.file_writer, + _writer_state: PhantomData, + _progress_state: PhantomData, + _properties_state: PhantomData, + } + } +} + +impl PipelineFactoryBuilder +where + Bytes: From, +{ + pub fn build_properties( + mut self, + arrow_schema: &SchemaRef, + writer_properties: Option, + ) -> Result> { + let properties_val = PipelineProperties::new(arrow_schema, writer_properties)?; + + self.properties.write(properties_val); + + Ok(PipelineFactoryBuilder { + progress: self.progress, + properties: self.properties, + file_writer: self.file_writer, + _writer_state: PhantomData, + _progress_state: PhantomData, + _properties_state: PhantomData, + }) + } +} + +impl PipelineFactoryBuilder +where + Bytes: From, +{ + pub fn build_writer(mut self, buf: Buf) -> Result> { + let pipeline_properties = unsafe { self.properties.assume_init_ref() }; + + let schema_ptr = pipeline_properties.parquet_schema.root_schema_ptr(); + let properties = Arc::clone(&pipeline_properties.writer_properties); + let writer_val = SerializedFileWriter::new(buf, schema_ptr, properties)?; + + self.file_writer.write(writer_val); + + Ok(PipelineFactoryBuilder { + progress: self.progress, + properties: self.properties, + file_writer: self.file_writer, + _writer_state: PhantomData, + _progress_state: PhantomData, + _properties_state: PhantomData, + }) + } +} + +impl PipelineFactoryBuilder +where + Bytes: From, +{ + pub fn into_factory( + self, + ) -> super::PipelineFactory { + self.without_progress().into_factory() + } +} +impl PipelineFactoryBuilder +where + Bytes: From, +{ + pub fn into_factory( + self, + ) -> super::PipelineFactory { + let progress = unsafe { self.progress.assume_init() }; + let properties = unsafe { self.properties.assume_init() }; + + super::PipelineFactory { + progress, + properties, + file_writer: self.file_writer, + writer_inbox: MaybeUninit::uninit(), + writer_executor: MaybeUninit::uninit(), + encoder_executor: MaybeUninit::uninit(), + _inbox_state: PhantomData, + _writer_state: PhantomData, + _encoder_state: PhantomData, + _file_writer_state: PhantomData, + } + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/factory/mod.rs b/crates/parquet-ext/src/writer/pipeline/factory/mod.rs new file mode 100644 index 000000000..32c89250f --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/factory/mod.rs @@ -0,0 +1,117 @@ +mod builder; + +use std::{io::Write, marker::PhantomData, mem::MaybeUninit}; + +use bytes::Bytes; +use parquet::{arrow::async_writer::AsyncFileWriter, file::writer::SerializedFileWriter}; + +pub use self::builder::PipelineFactoryBuilder; +use super::{EncoderExecutor, PipelineProperties, Progress, WriterExecutor, WriterInbox}; +use crate::{ + backend::PipelineBackend, + builder::{BuildState, Set, Unset}, + writer::pipeline::{Pipeline, encoder::EncoderFactory}, +}; + +pub struct PipelineFactory< + Buf: Default + Write + Send, + AsyncWriter: AsyncFileWriter + Send, + Inbox: BuildState = Unset, + Encode: BuildState = Unset, + Writer: BuildState = Unset, + FileWriter: BuildState = Set, +> where + Bytes: From, +{ + progress: Option, + properties: PipelineProperties, + file_writer: MaybeUninit>, + writer_inbox: MaybeUninit, + writer_executor: MaybeUninit>, + encoder_executor: MaybeUninit, + _inbox_state: PhantomData, + _writer_state: PhantomData, + _encoder_state: PhantomData, + _file_writer_state: PhantomData, +} + +impl + PipelineFactory +where + Bytes: From, +{ + pub fn spawn_encoder(mut self) -> PipelineFactory { + let file_writer = unsafe { self.file_writer.assume_init_ref() }; + let encoder_factory = EncoderFactory::new(file_writer, &self.properties.arrow_schema); + + let (encoder_executor_val, writer_inbox_val) = PipelineBackend::spawn_encoder( + &self.properties, + encoder_factory, + self.progress.as_ref(), + ); + + self.encoder_executor.write(encoder_executor_val); + self.writer_inbox.write(writer_inbox_val); + + PipelineFactory { + progress: self.progress, + properties: self.properties, + file_writer: self.file_writer, + writer_inbox: self.writer_inbox, + writer_executor: self.writer_executor, + encoder_executor: self.encoder_executor, + _inbox_state: PhantomData, + _writer_state: PhantomData, + _encoder_state: PhantomData, + _file_writer_state: PhantomData, + } + } +} + +impl + PipelineFactory +where + Bytes: From, +{ + pub fn spawn_writer( + mut self, + writer: AsyncWriter, + ) -> PipelineFactory { + let inbox = unsafe { self.writer_inbox.assume_init_read() }; + let file_writer = unsafe { self.file_writer.assume_init_read() }; + let progress = self.progress.as_ref(); + let writer_executor_val = WriterExecutor::new(inbox, writer, file_writer, progress); + self.writer_executor.write(writer_executor_val); + + PipelineFactory { + progress: self.progress, + properties: self.properties, + file_writer: MaybeUninit::uninit(), // file_writer is moved into writer_executor, so we can't keep it here + writer_inbox: MaybeUninit::uninit(), // writer_inbox is moved into writer_executor, so we can't keep it here + writer_executor: self.writer_executor, + encoder_executor: self.encoder_executor, + _inbox_state: PhantomData, + _writer_state: PhantomData, + _encoder_state: PhantomData, + _file_writer_state: PhantomData, + } + } +} + +impl + PipelineFactory +where + Bytes: From, +{ + pub fn build(mut self) -> Pipeline { + let encoder_executor = unsafe { self.encoder_executor.assume_init_read() }; + let writer_executor = unsafe { self.writer_executor.assume_init_read() }; + let progress = self.progress.take(); + + Pipeline { + progress, + encoder_executor, + writer_executor, + } + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/job.rs b/crates/parquet-ext/src/writer/pipeline/job.rs new file mode 100644 index 000000000..81f22bf35 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/job.rs @@ -0,0 +1,28 @@ +use arrow_array::RecordBatch; +use flume::{Receiver, Sender}; +use parquet::errors::ParquetError; + +use super::{EncodedRowGroup, Progress, RowGroupEncoder}; + +pub type WriterOutbox = Sender; +pub type EncoderInbox = Receiver; +pub type EncoderOutbox = Sender; +pub type WriterInbox = Receiver; + +pub struct EncodeJob { + pub encoder: RowGroupEncoder, + pub batches: Vec, + pub rows: usize, + pub finalize: bool, + pub reply_tx: EncoderOutbox, + pub progress: Option, +} + +pub enum WriteJob { + /// Successfully encoded a row group. + Encoded { row_group: EncodedRowGroup }, + /// Failed to encode a row group. + Error { id: usize, error: ParquetError }, + /// Final row group encoded, writer should finalize and reply with metadata. + Finalize { row_group: EncodedRowGroup }, +} diff --git a/crates/parquet-ext/src/writer/pipeline/mod.rs b/crates/parquet-ext/src/writer/pipeline/mod.rs new file mode 100644 index 000000000..acaba801c --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/mod.rs @@ -0,0 +1,110 @@ +mod backend; +mod encoder; +mod factory; +mod job; +mod progress; +mod properties; +mod writer; + +use std::io::Write; + +use arrow_array::RecordBatch; +use bytes::Bytes; +use parquet::{ + arrow::async_writer::AsyncFileWriter, + errors::Result, + file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaDataPtr}, +}; + +pub use self::{encoder::EncoderFactory, properties::PipelineProperties}; +use self::{ + encoder::{EncodedRowGroup, EncoderExecutor, RowGroupEncoder}, + factory::PipelineFactoryBuilder, + job::{EncoderOutbox, WriteJob, WriterInbox, WriterOutbox}, + progress::Progress, + writer::WriterExecutor, +}; + +pub struct Pipeline +where + Bytes: From, +{ + pub(self) progress: Option, + pub(self) encoder_executor: EncoderExecutor, + pub(self) writer_executor: WriterExecutor, +} + +impl Pipeline +where + Bytes: From, +{ + pub fn builder() -> PipelineFactoryBuilder { + PipelineFactoryBuilder::default() + } + + pub async fn add_batch_async(&mut self, batch: RecordBatch) -> Result<()> { + futures::try_join!( + self.encoder_executor.handle_batch_async(batch), + self.writer_executor.drain_ready() + )?; + Ok(()) + } + + pub async fn flush_async(&mut self) -> Result<()> { + futures::try_join!( + self.writer_executor.drain_ready(), + self.encoder_executor.flush_async(false) + )?; + Ok(()) + } + + pub fn append_key_value_metadata(&mut self, kv: KeyValue) { + self.writer_executor.append_key_value_metadata(kv); + } + + pub async fn close_async(mut self) -> Result { + self.encoder_executor.flush_async(true).await?; + let metadata = self.writer_executor.close_async().await?; + Ok(metadata) + } + + pub fn shutdown(self) { + self.encoder_executor.shutdown(); + } + + /// Get the number of rows that are buffered or being encoded but not yet written. + pub fn in_progress_rows(&self) -> usize { + self.progress + .as_ref() + .map(Progress::in_progress_rows) + .unwrap_or_default() + } + + /// Get the estimated size of data that is buffered or being encoded but not yet written. + pub fn in_progress_size(&self) -> usize { + self.progress + .as_ref() + .map(Progress::in_progress_size) + .unwrap_or_default() + } + + /// Get the estimated memory usage of buffered data. + pub fn memory_size(&self) -> usize { + self.progress + .as_ref() + .map(Progress::memory_size) + .unwrap_or_default() + } + + /// Get the total bytes written to the output so far. + pub fn bytes_written(&self) -> usize { + self.progress + .as_ref() + .map(Progress::bytes_written) + .unwrap_or_default() + } + + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + self.writer_executor.flushed_row_groups() + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/progress.rs b/crates/parquet-ext/src/writer/pipeline/progress.rs new file mode 100644 index 000000000..6e3fb0b08 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/progress.rs @@ -0,0 +1,109 @@ +use std::sync::{ + Arc, + atomic::{AtomicIsize, AtomicUsize, Ordering}, +}; + +#[derive(Default)] +pub struct Progress { + inner: Arc, +} + +#[derive(Default)] +struct ProgressInner { + /// Rows buffered in RowGroupBuilder (not yet sent to encoder). + buffered_rows: AtomicUsize, + /// Estimated memory size of buffered batches. + buffered_bytes: AtomicUsize, + /// Rows currently being encoded. + encoding_rows: AtomicUsize, + /// Estimated size of data being encoded. + /// Signed because initial estimates shrink as compression occurs. + encoding_bytes: AtomicIsize, + /// Bytes written to output. + bytes_written: AtomicUsize, + /// Rows written to output. + rows_written: AtomicUsize, +} + +impl Progress { + pub fn new() -> Self { + Self::default() + } + + pub fn add_buffered(&self, rows: usize, bytes: usize) { + self.inner.buffered_rows.fetch_add(rows, Ordering::Relaxed); + self.inner + .buffered_bytes + .fetch_add(bytes, Ordering::Relaxed); + } + + pub fn move_to_encoding(&self, rows: usize, bytes: usize) { + self.inner.buffered_rows.fetch_sub(rows, Ordering::Relaxed); + self.inner + .buffered_bytes + .fetch_sub(bytes, Ordering::Relaxed); + self.inner.encoding_rows.fetch_add(rows, Ordering::Relaxed); + } + + pub fn update_encoded_bytes(&self, delta: isize) { + self.inner + .encoding_bytes + .fetch_add(delta, Ordering::Relaxed); + } + + pub fn finish_encoding(&self, rows: usize, bytes: isize) { + self.inner.encoding_rows.fetch_sub(rows, Ordering::Relaxed); + self.inner + .encoding_bytes + .fetch_sub(bytes, Ordering::Relaxed); + } + + pub fn record_write(&self, rows: usize, bytes: usize) { + self.inner.rows_written.fetch_add(rows, Ordering::Relaxed); + self.inner.bytes_written.fetch_add(bytes, Ordering::Relaxed); + } + + pub fn buffered_bytes(&self) -> usize { + self.inner.buffered_bytes.load(Ordering::Relaxed) + } + + pub fn buffered_rows(&self) -> usize { + self.inner.buffered_rows.load(Ordering::Relaxed) + } + + pub fn encoding_bytes(&self) -> usize { + self.inner.encoding_bytes.load(Ordering::Relaxed).max(0) as usize + } + + pub fn encoding_rows(&self) -> usize { + self.inner.encoding_rows.load(Ordering::Relaxed) + } + + pub fn in_progress_rows(&self) -> usize { + self.buffered_rows() + self.encoding_rows() + } + + pub fn in_progress_size(&self) -> usize { + self.buffered_bytes() + self.encoding_bytes() + } + + pub fn memory_size(&self) -> usize { + self.buffered_bytes() + self.encoding_bytes() + } + + pub fn bytes_written(&self) -> usize { + self.inner.bytes_written.load(Ordering::Relaxed) + } + + pub fn rows_written(&self) -> usize { + self.inner.rows_written.load(Ordering::Relaxed) + } +} + +impl Clone for Progress { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/properties.rs b/crates/parquet-ext/src/writer/pipeline/properties.rs new file mode 100644 index 000000000..c2653d7ae --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/properties.rs @@ -0,0 +1,96 @@ +use std::{env::var_os, num::NonZeroUsize, sync::Arc, thread::available_parallelism}; + +use arrow_schema::SchemaRef; +use parquet::{ + arrow::{ArrowSchemaConverter, add_encoded_arrow_schema_to_metadata}, + errors::{ParquetError, Result}, + file::properties::{WriterProperties, WriterPropertiesPtr}, + schema::types::SchemaDescPtr, +}; + +use crate::writer::pipeline::job::{EncoderInbox, EncoderOutbox, WriterInbox, WriterOutbox}; + +pub struct PipelineProperties { + pub(in crate::writer) num_workers: usize, + pub(in crate::writer) arrow_schema: SchemaRef, + pub(in crate::writer) parquet_schema: SchemaDescPtr, + pub(in crate::writer) channel_capacity: usize, + pub(in crate::writer) writer_properties: WriterPropertiesPtr, +} + +impl PipelineProperties { + pub fn new( + arrow_schema: &SchemaRef, + writer_properties: Option, + ) -> Result { + let arrow_schema = Arc::clone(arrow_schema); + + let num_workers = var_os("AMP_PARQUET_WRITER_PARALLELISM") + .map(|val| val.into_string().map_err(parse_env_var_to_parquet_err)) + .transpose()? + .map(|s| s.parse::().map_err(parse_err_to_parquet_err)) + .unwrap_or_else(|| { + available_parallelism() + .map(NonZeroUsize::get) + .map_err(ParquetError::from) + })? + .max(arrow_schema.flattened_fields().len()); + + let channel_capacity = 2 * num_workers; + + let converter = ArrowSchemaConverter::new().with_coerce_types(true); + let parquet_schema = converter.convert(&arrow_schema).map(Arc::new)?; + + let writer_properties = writer_properties + .map(add_arrow_schema_to_props(&arrow_schema)) + .unwrap_or_else(default_props_with_arrow_schema(&arrow_schema)); + + Ok(Self { + num_workers, + arrow_schema, + parquet_schema, + channel_capacity, + writer_properties, + }) + } + + pub fn with_num_workers(&mut self, num_workers: usize) { + self.num_workers = num_workers; + self.channel_capacity = 2 * num_workers; + } + + pub fn create_encoder_channel(&self) -> (WriterOutbox, EncoderInbox) { + flume::bounded(self.channel_capacity) + } + + pub fn create_writer_channel(&self) -> (EncoderOutbox, WriterInbox) { + flume::unbounded() + } +} + +fn add_arrow_schema_to_props( + arrow_schema: &SchemaRef, +) -> impl FnOnce(WriterProperties) -> WriterPropertiesPtr { + move |mut props: WriterProperties| { + add_encoded_arrow_schema_to_metadata(arrow_schema, &mut props); + Arc::new(props) + } +} + +fn default_props_with_arrow_schema( + arrow_schema: &SchemaRef, +) -> impl FnOnce() -> WriterPropertiesPtr { + || { + let mut props = WriterProperties::default(); + add_encoded_arrow_schema_to_metadata(arrow_schema, &mut props); + Arc::new(props) + } +} + +fn parse_err_to_parquet_err(e: std::num::ParseIntError) -> ParquetError { + ParquetError::External(format!("Failed to parse parallelism value: {e}").into()) +} + +fn parse_env_var_to_parquet_err(e: std::ffi::OsString) -> ParquetError { + ParquetError::External(format!("Failed to read parallelism env var: {e:?}").into()) +} diff --git a/crates/parquet-ext/src/writer/pipeline/writer/mod.rs b/crates/parquet-ext/src/writer/pipeline/writer/mod.rs new file mode 100644 index 000000000..3f14a9519 --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/writer/mod.rs @@ -0,0 +1,206 @@ +mod row_group; + +use std::io::Write; + +use bytes::Bytes; +use parquet::{ + arrow::async_writer::AsyncFileWriter, + errors::{ParquetError, Result}, + file::{ + metadata::{KeyValue, ParquetMetaData, RowGroupMetaDataPtr}, + writer::SerializedFileWriter, + }, +}; + +use self::row_group::PendingRowGroups; +use super::{Progress, WriterInbox}; +use crate::writer::pipeline::{job::WriteJob, writer::row_group::PendingRowGroup}; + +pub struct WriterExecutor +where + Bytes: From, +{ + inbox: WriterInbox, + writer: Writer, + file_writer: SerializedFileWriter, + pending: PendingRowGroups, + progress: Option, + flushed_row_groups: Vec, + key_value_metadata: Vec, +} + +impl WriterExecutor +where + Bytes: From, +{ + pub fn new( + inbox: WriterInbox, + writer: Writer, + file_writer: SerializedFileWriter, + progress: Option<&Progress>, + ) -> Self { + Self { + inbox, + writer, + file_writer, + pending: PendingRowGroups::default(), + progress: progress.cloned(), + flushed_row_groups: Vec::new(), + key_value_metadata: Vec::new(), + } + } + + /// Non-blocking drain: process any ready WriteJobs via try_recv. + pub async fn drain_ready(&mut self) -> Result<()> { + while let Ok(job) = self.inbox.try_recv() { + match job { + WriteJob::Encoded { row_group } => { + self.pending.insert(row_group); + } + WriteJob::Finalize { row_group } => { + self.pending.insert(row_group); + } + WriteJob::Error { id, error } => { + eprintln!("Encoding error for row group {id}: {error}"); + return Err(error); + } + } + } + + self.flush_ready().await + } + + /// Blocking close: receive all remaining WriteJobs until Finalize, then finalize. + pub async fn close_async(mut self) -> Result { + loop { + match self + .inbox + .recv_async() + .await + .map_err(|err| ParquetError::External(err.into()))? + { + WriteJob::Encoded { row_group } => { + self.pending.insert(row_group); + self.flush_ready().await?; + } + WriteJob::Finalize { row_group } => { + self.pending.insert(row_group); + self.flush_ready().await?; + return self.finalize().await; + } + WriteJob::Error { id, error } => { + eprintln!("Encoding error for row group {id}: {error}"); + return Err(error); + } + } + } + } + + /// Get flushed row group metadata. + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + &self.flushed_row_groups + } + + pub fn append_key_value_metadata(&mut self, kv: KeyValue) { + self.key_value_metadata.push(kv); + } + + async fn write_row_group(&mut self, pending: PendingRowGroup) -> Result<()> { + let PendingRowGroup { + chunks, + rows, + bytes, + } = pending; + + if let Some(progress) = &mut self.progress { + progress.finish_encoding(rows, bytes); + } + + if rows == 0 { + return Ok(()); + } + + let mut row_group_writer = self.file_writer.next_row_group()?; + + for chunk in chunks { + chunk.append_to_row_group(&mut row_group_writer)?; + } + + let metadata = row_group_writer.close()?; + self.flushed_row_groups.push(metadata); + + let bs = Bytes::from(std::mem::take(self.file_writer.inner_mut())); + let bytes_len = bs.len(); + self.writer.write(bs).await?; + if let Some(progress) = &mut self.progress { + progress.record_write(rows, bytes_len); + } + + Ok(()) + } + + /// Write any row groups that are ready (in order). + async fn flush_ready(&mut self) -> Result<()> { + while let Some(pending) = self.pending.next_ready() { + self.write_row_group(pending).await?; + } + Ok(()) + } + + async fn flush_pending_encoders(&mut self) -> Result<()> { + while self.pending.has_pending() { + match self + .inbox + .recv_async() + .await + .map_err(|err| ParquetError::General(err.to_string())) + { + Ok(WriteJob::Encoded { row_group }) => { + self.pending.insert(row_group); + } + Ok(WriteJob::Error { id, error }) => { + eprintln!("Encoding error for row group {id}: {error}"); + return Err(error); + } + Err(err) => { + return Err(err); + } + Ok(WriteJob::Finalize { .. }) => { + unreachable!("Unexpected Finalize message during drain") + } + } + + self.flush_ready().await?; + } + + Ok(()) + } + + async fn finalize(&mut self) -> Result { + self.flush_pending_encoders().await?; + self.flush_key_value_metadata(); + + if !self.pending.is_empty() { + return Err(ParquetError::General(format!( + "Attempted to close parquet file with {} pending row groups (missing IDs: {:?})", + self.pending.len(), + self.pending.keys().collect::>() + ))); + } + + let metadata = self.file_writer.finish()?; + + let buf = std::mem::take(self.file_writer.inner_mut()); + + self.writer.write(Bytes::from(buf)).await?; + self.writer.complete().await?; + + Ok(metadata) + } + + fn flush_key_value_metadata(&mut self) { + for kv in self.key_value_metadata.drain(..) { + self.file_writer.append_key_value_metadata(kv); + } + } +} diff --git a/crates/parquet-ext/src/writer/pipeline/writer/row_group.rs b/crates/parquet-ext/src/writer/pipeline/writer/row_group.rs new file mode 100644 index 000000000..d6eee3afc --- /dev/null +++ b/crates/parquet-ext/src/writer/pipeline/writer/row_group.rs @@ -0,0 +1,70 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use parquet::arrow::arrow_writer::ArrowColumnChunk; + +use crate::writer::pipeline::EncodedRowGroup; + +#[derive(Default)] +pub struct PendingRowGroups { + inner: BTreeMap, + next: usize, + outstanding: BTreeSet, +} + +pub struct PendingRowGroup { + pub chunks: Vec, + pub rows: usize, + pub bytes: isize, +} + +impl EncodedRowGroup { + pub fn into_pending(self) -> (usize, PendingRowGroup) { + ( + self.id, + PendingRowGroup { + bytes: self.encoded_bytes, + rows: self.encoded_rows, + chunks: self.encoded_cols, + }, + ) + } +} + +impl PendingRowGroups { + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn keys(&self) -> impl Iterator { + self.inner.keys() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn has_pending(&self) -> bool { + !self.outstanding.is_empty() + } + + pub fn insert(&mut self, row_group: EncodedRowGroup) { + let (id, pending) = row_group.into_pending(); + + self.inner.insert(id, pending); + + let min_id = self.outstanding.first().cloned().unwrap_or(id).min(id); + let max_id = self.outstanding.last().cloned().unwrap_or(id).max(id); + self.outstanding.extend(min_id..=max_id); + } + + pub fn next_ready(&mut self) -> Option { + let id = self.next; + if let Some(ready_group) = self.inner.remove(&id) { + self.outstanding.remove(&id); + self.next += 1; + Some(ready_group) + } else { + None + } + } +}