Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 68 additions & 4 deletions src/datarepo/core/tables/deltalake_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ def fetch_df_by_partition(
partition: list[tuple[str, str, Any]],
schema: pa.Schema,
storage_options: dict[str, Any] | None = None,
use_batching: bool = False,
batch_size: int = 10,
) -> pl.DataFrame:
"""
The native delta-rs read has slower performance. The difference comes from the dataset in
Expand All @@ -325,6 +327,13 @@ def fetch_df_by_partition(
Thus, it makes sense to use read_table() since we have a predefined list of files to be loaded
from dt.files

Args:
dt: DeltaTable object
partition: Partition filters
schema: Schema to normalize to
storage_options: Storage options
use_batching: Whether to use batched file processing for better performance
batch_size: Number of files to process per batch when batching is enabled
"""
files = dt.files(partition_filters=partition)

Expand All @@ -334,10 +343,17 @@ def fetch_df_by_partition(
# NOTE: polars has a bug where with_columns(...) on an empty dataframe with no columns will add a row
# for all new columns. Workaround by adding the columns before normalizing
return _empty_normalized_df(schema)

return fetch_dfs_by_paths(
files=files, schema=schema, storage_options=storage_options
)
if use_batching:
return fetch_dfs_by_paths_batching(
files=files,
schema=schema,
storage_options=storage_options,
batch_size=batch_size,
)
else:
return fetch_dfs_by_paths(
files=files, schema=schema, storage_options=storage_options
)


def fetch_dfs_by_paths(
Expand Down Expand Up @@ -374,6 +390,54 @@ def fetch_dfs_by_paths(
return pl.concat([_normalize_df(df, schema=schema) for df in dfs])


def fetch_dfs_by_paths_batching(
files: list[str],
schema: pa.Schema,
batch_size: int,
storage_options: dict[str, Any] | None = None,
) -> pl.DataFrame:
"""Fetch dataframes from a list of Parquet files using multithreading.

Args:
files (list[str]): List of file paths to read Parquet files from.
schema (pa.Schema): Schema to normalize the dataframes to.
storage_options (dict[str, Any] | None, optional): Storage options for reading the Parquet files, such as S3 access credentials. Defaults to None.
batch_size (int): Number of files to process per batch.

Returns:
pl.DataFrame: A Polars DataFrame containing the concatenated results of all Parquet files, normalized to the specified schema.
"""
if not files:
return _empty_normalized_df(schema)

def batcher(arr: list[str], batch_size: int):
"""Split array into batches of specified size."""
for i in range(0, len(arr), batch_size):
yield arr[i : i + batch_size]

def read_batch(batch):
return pl.concat(
[
pl.read_parquet(
f,
retries=READ_PARQUET_RETRY_COUNT,
storage_options=storage_options,
hive_partitioning=True,
)
for f in batch
]
)

with ThreadPoolExecutor() as executor:
futures = [
executor.submit(read_batch, batch) for batch in batcher(files, batch_size)
]

dfs = [future.result() for future in futures]

return pl.concat([_normalize_df(df, schema=schema) for df in dfs])


def _empty_normalized_df(schema: pa.Schema) -> pl.DataFrame:
"""Create an empty DataFrame with the specified schema.

Expand Down