diff --git a/docs/content/pypaimon/python-api.md b/docs/content/pypaimon/python-api.md index 51eded3d7354..8aa27661d832 100644 --- a/docs/content/pypaimon/python-api.md +++ b/docs/content/pypaimon/python-api.md @@ -551,6 +551,140 @@ table.rollback_to('v3') # tag name The `rollback_to` method accepts either an `int` (snapshot ID) or a `str` (tag name) and automatically dispatches to the appropriate rollback logic. +## Streaming Read + +Streaming reads allow you to continuously read new data as it arrives in a Paimon table. This is useful for building +real-time data pipelines and ETL jobs. + +### Basic Streaming Read + +Use `StreamReadBuilder` to create a streaming scan that continuously polls for new snapshots: + +```python +table = catalog.get_table('database_name.table_name') + +# Create streaming read builder +stream_builder = table.new_stream_read_builder() +stream_builder.with_poll_interval_ms(1000) # Poll every 1 second + +# Create streaming scan and table read +scan = stream_builder.new_streaming_scan() +table_read = stream_builder.new_read() + +# Async streaming (recommended for ETL pipelines) +import asyncio + +async def process_stream(): + async for plan in scan.stream(): + for split in plan.splits(): + arrow_batch = table_read.to_arrow([split]) + # Process the data + print(f"Received {arrow_batch.num_rows} rows") + +asyncio.run(process_stream()) +``` + +### Synchronous Streaming + +For simpler use cases, you can use the synchronous wrapper: + +```python +# Synchronous streaming +for plan in scan.stream_sync(): + arrow_table = table_read.to_arrow(plan.splits()) + process(arrow_table) +``` + +### Manual Position Control + +You can directly read and set the scan position via `next_snapshot_id`: + +```python +# Save current position +saved_position = scan.next_snapshot_id + +# Later, restore position +scan.next_snapshot_id = saved_position + +# Or start from a specific snapshot +scan.next_snapshot_id = 42 +``` + +### Filtering Streaming Data + +You can apply predicates and projections to streaming reads: + +```python +stream_builder = table.new_stream_read_builder() + +# Build predicate +predicate_builder = stream_builder.new_predicate_builder() +predicate = predicate_builder.greater_than('timestamp', 1704067200000) + +# Apply filter and projection +stream_builder.with_filter(predicate) +stream_builder.with_projection(['id', 'name', 'timestamp']) + +scan = stream_builder.new_streaming_scan() +``` + +Key points about streaming reads: + +- **Poll Interval**: Controls how often to check for new snapshots (default: 1000ms) +- **Initial Scan**: First iteration returns all existing data, subsequent iterations return only new data +- **Commit Types**: By default, only APPEND commits are processed; COMPACT and OVERWRITE are skipped + +### Parallel Consumption + +For high-throughput streaming, you can run multiple consumers in parallel, each reading a disjoint subset of buckets. +This is similar to Kafka consumer groups. + +**Using `with_buckets()` for explicit bucket assignment**: + +```python +# Consumer 0 reads buckets 0, 1, 2 +stream_builder.with_buckets([0, 1, 2]) + +# Consumer 1 reads buckets 3, 4, 5 +stream_builder.with_buckets([3, 4, 5]) +``` + +**Using `with_bucket_filter()` for custom filtering**: + +```python +# Read only even buckets +stream_builder.with_bucket_filter(lambda b: b % 2 == 0) +``` + +### Row Kind Support + +For changelog streams, you can include the row kind to distinguish between inserts, updates, and deletes: + +```python +stream_builder = table.new_stream_read_builder() +stream_builder.with_include_row_kind(True) + +scan = stream_builder.new_streaming_scan() +table_read = stream_builder.new_read() + +async for plan in scan.stream(): + arrow_table = table_read.to_arrow(plan.splits()) + for row in arrow_table.to_pylist(): + row_kind = row['_row_kind'] # +I, -U, +U, or -D + if row_kind == '+I': + handle_insert(row) + elif row_kind == '-D': + handle_delete(row) + elif row_kind in ('-U', '+U'): + handle_update(row) +``` + +Row kind values: +- `+I`: Insert +- `-U`: Update before (old value) +- `+U`: Update after (new value) +- `-D`: Delete + ## Data Types | Python Native Type | PyArrow Type | Paimon Type | @@ -742,3 +876,6 @@ The following shows the supported features of Python Paimon compared to Java Pai - Reading and writing blob data - `with_shard` feature - Rollback feature + - Streaming reads + - Parallel consumption with bucket filtering + - Row kind support for changelog streams diff --git a/paimon-python/pypaimon/acceptance/__init__.py b/paimon-python/pypaimon/acceptance/__init__.py new file mode 100644 index 000000000000..d8cea1afd7bf --- /dev/null +++ b/paimon-python/pypaimon/acceptance/__init__.py @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +Acceptance tests for pypaimon. + +These tests use real file I/O with local temp filesystem to verify +end-to-end behavior, as opposed to unit tests which use mocks. +""" diff --git a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py new file mode 100644 index 000000000000..0dc7ebf53fff --- /dev/null +++ b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py @@ -0,0 +1,238 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +Acceptance tests for IncrementalDiffScanner. + +These tests verify that the diff approach (reading 2 base_manifest_lists) +returns the same data as the delta approach (reading N delta_manifest_lists). + +Uses real file I/O with local temp filesystem. +""" + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.read.scanner.append_table_split_generator import \ + AppendTableSplitGenerator +from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner +from pypaimon.snapshot.snapshot_manager import SnapshotManager + + +class IncrementalDiffAcceptanceTest(unittest.TestCase): + """Acceptance tests for diff vs delta equivalence with real data.""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + ('id', pa.int32()), + ('value', pa.string()), + ('partition_col', pa.string()) + ]) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_table_with_snapshots(self, name, num_snapshots=5, partition_keys=None): + """Create a table and write num_snapshots of data. + + Returns: + Tuple of (table, expected_data_per_snapshot) + """ + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=partition_keys) + self.catalog.create_table(f'default.{name}', schema, False) + table = self.catalog.get_table(f'default.{name}') + + all_data = [] + for snap_id in range(1, num_snapshots + 1): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + data = { + 'id': [snap_id * 10 + i for i in range(5)], + 'value': [f'snap{snap_id}_row{i}' for i in range(5)], + 'partition_col': ['p1' if i % 2 == 0 else 'p2' for i in range(5)] + } + all_data.append(data) + + pa_table = pa.Table.from_pydict(data, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + return table, all_data + + def _read_via_diff(self, table, start_snap_id, end_snap_id): + """Read data using IncrementalDiffScanner between two snapshots.""" + snapshot_manager = SnapshotManager(table) + start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id) + end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id) + + scanner = IncrementalDiffScanner(table) + plan = scanner.scan(start_snapshot, end_snapshot) + + table_read = table.new_read_builder().new_read() + return table_read.to_arrow(plan.splits()) + + def _read_via_delta(self, table, start_snap_id, end_snap_id): + """Read data by iterating delta_manifest_lists between two snapshots.""" + snapshot_manager = SnapshotManager(table) + manifest_list_manager = ManifestListManager(table) + manifest_file_manager = ManifestFileManager(table) + + all_entries = [] + for snap_id in range(start_snap_id + 1, end_snap_id + 1): + snapshot = snapshot_manager.get_snapshot_by_id(snap_id) + if snapshot and snapshot.commit_kind == 'APPEND': + manifest_files = manifest_list_manager.read_delta(snapshot) + if manifest_files: + entries = manifest_file_manager.read_entries_parallel(manifest_files) + all_entries.extend(entries) + + # Create splits from entries + options = table.options + split_generator = AppendTableSplitGenerator( + table, + options.source_split_target_size(), + options.source_split_open_file_cost(), + {} + ) + splits = split_generator.create_splits(all_entries) + + table_read = table.new_read_builder().new_read() + return table_read.to_arrow(splits) + + def _rows_to_set(self, arrow_table): + """Convert arrow table to set of (id, value, partition_col) tuples.""" + rows = set() + for i in range(arrow_table.num_rows): + row = ( + arrow_table.column('id')[i].as_py(), + arrow_table.column('value')[i].as_py(), + arrow_table.column('partition_col')[i].as_py() + ) + rows.add(row) + return rows + + def test_diff_returns_same_rows_as_delta_simple(self): + """ + Basic case: 5 snapshots, verify row-level equivalence. + + Creates a table with 5 snapshots, then reads data from snapshot 1 to 5 + using both diff and delta approaches, verifying they return the same rows. + """ + table, all_data = self._create_table_with_snapshots( + 'test_diff_delta_simple', + num_snapshots=5 + ) + + # Read using both approaches (from snapshot 1 to 5, so we get snapshots 2-5) + diff_result = self._read_via_diff(table, 1, 5) + delta_result = self._read_via_delta(table, 1, 5) + + # Convert to sets for order-independent comparison + diff_rows = self._rows_to_set(diff_result) + delta_rows = self._rows_to_set(delta_result) + + self.assertEqual(diff_rows, delta_rows) + + # Verify we got the expected number of rows (snapshots 2-5, 5 rows each = 20) + self.assertEqual(len(diff_rows), 20) + + # Verify specific IDs are present (from snapshots 2-5) + expected_ids = set() + for snap_id in range(2, 6): # snapshots 2, 3, 4, 5 + for i in range(5): + expected_ids.add(snap_id * 10 + i) + + actual_ids = {row[0] for row in diff_rows} + self.assertEqual(actual_ids, expected_ids) + + def test_diff_returns_same_rows_as_delta_many_snapshots(self): + """ + Stress test: 20 snapshots, verify row-level equivalence. + + This tests the catch-up scenario where there are many snapshots + between start and end. + """ + table, all_data = self._create_table_with_snapshots( + 'test_diff_delta_many', + num_snapshots=20 + ) + + # Read using both approaches (from snapshot 1 to 20) + diff_result = self._read_via_diff(table, 1, 20) + delta_result = self._read_via_delta(table, 1, 20) + + # Convert to sets for order-independent comparison + diff_rows = self._rows_to_set(diff_result) + delta_rows = self._rows_to_set(delta_result) + + self.assertEqual(diff_rows, delta_rows) + + # Verify we got the expected number of rows (snapshots 2-20, 5 rows each = 95) + self.assertEqual(len(diff_rows), 95) + + def test_diff_returns_same_rows_with_mixed_partitions(self): + """ + Partitioned table: Verify diff handles multiple partitions correctly. + + Creates a partitioned table and verifies diff and delta return + the same rows across all partitions. + """ + table, all_data = self._create_table_with_snapshots( + 'test_diff_delta_partitioned', + num_snapshots=5, + partition_keys=['partition_col'] + ) + + # Read using both approaches + diff_result = self._read_via_diff(table, 1, 5) + delta_result = self._read_via_delta(table, 1, 5) + + # Convert to sets for order-independent comparison + diff_rows = self._rows_to_set(diff_result) + delta_rows = self._rows_to_set(delta_result) + + self.assertEqual(diff_rows, delta_rows) + + # Verify both partitions have data + p1_rows = {r for r in diff_rows if r[2] == 'p1'} + p2_rows = {r for r in diff_rows if r[2] == 'p2'} + + self.assertGreater(len(p1_rows), 0, "Should have rows in partition p1") + self.assertGreater(len(p2_rows), 0, "Should have rows in partition p2") + + +if __name__ == '__main__': + unittest.main()