From d2b323bd746fa958f4a8f3093e5636086c05a26a Mon Sep 17 00:00:00 2001 From: Ron Xavier Date: Sun, 17 Aug 2025 08:40:08 -0400 Subject: [PATCH 1/3] Added Batching functionality --- src/datarepo/core/tables/deltalake_table.py | 70 +++++++++++++++++++-- 1 file changed, 66 insertions(+), 4 deletions(-) diff --git a/src/datarepo/core/tables/deltalake_table.py b/src/datarepo/core/tables/deltalake_table.py index 7c9a57a..31790ae 100644 --- a/src/datarepo/core/tables/deltalake_table.py +++ b/src/datarepo/core/tables/deltalake_table.py @@ -305,6 +305,8 @@ def fetch_df_by_partition( partition: list[tuple[str, str, Any]], schema: pa.Schema, storage_options: dict[str, Any] | None = None, + use_batching: bool = False, + batch_size: int = 10, ) -> pl.DataFrame: """ The native delta-rs read has slower performance. The difference comes from the dataset in @@ -325,6 +327,13 @@ def fetch_df_by_partition( Thus, it makes sense to use read_table() since we have a predefined list of files to be loaded from dt.files + Args: + dt: DeltaTable object + partition: Partition filters + schema: Schema to normalize to + storage_options: Storage options + use_batching: Whether to use batched file processing for better performance + batch_size: Number of files to process per batch when batching is enabled """ files = dt.files(partition_filters=partition) @@ -334,10 +343,17 @@ def fetch_df_by_partition( # NOTE: polars has a bug where with_columns(...) on an empty dataframe with no columns will add a row # for all new columns. Workaround by adding the columns before normalizing return _empty_normalized_df(schema) - - return fetch_dfs_by_paths( - files=files, schema=schema, storage_options=storage_options - ) + if use_batching: + return fetch_dfs_by_paths_batching( + files=files, + schema=schema, + storage_options=storage_options, + batch_size=batch_size + ) + else: + return fetch_dfs_by_paths( + files=files, schema=schema, storage_options=storage_options + ) def fetch_dfs_by_paths( @@ -373,6 +389,52 @@ def fetch_dfs_by_paths( return pl.concat([_normalize_df(df, schema=schema) for df in dfs]) +def fetch_dfs_by_paths_batching( + files: list[str], + schema: pa.Schema, + batch_size: int, + storage_options: dict[str, Any] | None = None, +) -> pl.DataFrame: + """Fetch dataframes from a list of Parquet files using multithreading. + + Args: + files (list[str]): List of file paths to read Parquet files from. + schema (pa.Schema): Schema to normalize the dataframes to. + storage_options (dict[str, Any] | None, optional): Storage options for reading the Parquet files, such as S3 access credentials. Defaults to None. + batch_size (int ): Number of files to process per batch. + + Returns: + pl.DataFrame: A Polars DataFrame containing the concatenated results of all Parquet files, normalized to the specified schema. + """ + if not files: + return _empty_normalized_df(schema) + + def batcher(arr: list[str], batch_size: int): + """Split array into batches of specified size.""" + for i in range(0, len(arr), batch_size): + yield arr[i:i + batch_size] + + def read_batch(batch): + return pl.concat([ + pl.read_parquet( + f, + retries=READ_PARQUET_RETRY_COUNT, + storage_options=storage_options, + hive_partitioning=True, + ) + for f in batch + ]) + + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(read_batch, batch) + for batch in batcher(files, batch_size) + ] + + dfs = [future.result() for future in futures] + + return pl.concat([_normalize_df(df, schema=schema) for df in dfs]) + def _empty_normalized_df(schema: pa.Schema) -> pl.DataFrame: """Create an empty DataFrame with the specified schema. From 348e75b1fef51094627f12ccbc06a7879f7fd038 Mon Sep 17 00:00:00 2001 From: Ron Xavier Date: Sun, 17 Aug 2025 08:50:01 -0400 Subject: [PATCH 2/3] Update deltalake_table.py --- src/datarepo/core/tables/deltalake_table.py | 36 +++++++++++---------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/datarepo/core/tables/deltalake_table.py b/src/datarepo/core/tables/deltalake_table.py index 31790ae..3b2ae0f 100644 --- a/src/datarepo/core/tables/deltalake_table.py +++ b/src/datarepo/core/tables/deltalake_table.py @@ -345,10 +345,10 @@ def fetch_df_by_partition( return _empty_normalized_df(schema) if use_batching: return fetch_dfs_by_paths_batching( - files=files, - schema=schema, - storage_options=storage_options, - batch_size=batch_size + files=files, + schema=schema, + storage_options=storage_options, + batch_size=batch_size, ) else: return fetch_dfs_by_paths( @@ -389,6 +389,7 @@ def fetch_dfs_by_paths( return pl.concat([_normalize_df(df, schema=schema) for df in dfs]) + def fetch_dfs_by_paths_batching( files: list[str], schema: pa.Schema, @@ -412,24 +413,25 @@ def fetch_dfs_by_paths_batching( def batcher(arr: list[str], batch_size: int): """Split array into batches of specified size.""" for i in range(0, len(arr), batch_size): - yield arr[i:i + batch_size] + yield arr[i : i + batch_size] def read_batch(batch): - return pl.concat([ - pl.read_parquet( - f, - retries=READ_PARQUET_RETRY_COUNT, - storage_options=storage_options, - hive_partitioning=True, - ) - for f in batch - ]) + return pl.concat( + [ + pl.read_parquet( + f, + retries=READ_PARQUET_RETRY_COUNT, + storage_options=storage_options, + hive_partitioning=True, + ) + for f in batch + ] + ) with ThreadPoolExecutor() as executor: futures = [ - executor.submit(read_batch, batch) - for batch in batcher(files, batch_size) - ] + executor.submit(read_batch, batch) for batch in batcher(files, batch_size) + ] dfs = [future.result() for future in futures] From fb0813f42595163a1aae0729998205c6ae0bab96 Mon Sep 17 00:00:00 2001 From: Ron Xavier Date: Mon, 18 Aug 2025 06:30:02 +0530 Subject: [PATCH 3/3] Update src/datarepo/core/tables/deltalake_table.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/datarepo/core/tables/deltalake_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datarepo/core/tables/deltalake_table.py b/src/datarepo/core/tables/deltalake_table.py index 3b2ae0f..0f8a22f 100644 --- a/src/datarepo/core/tables/deltalake_table.py +++ b/src/datarepo/core/tables/deltalake_table.py @@ -402,7 +402,7 @@ def fetch_dfs_by_paths_batching( files (list[str]): List of file paths to read Parquet files from. schema (pa.Schema): Schema to normalize the dataframes to. storage_options (dict[str, Any] | None, optional): Storage options for reading the Parquet files, such as S3 access credentials. Defaults to None. - batch_size (int ): Number of files to process per batch. + batch_size (int): Number of files to process per batch. Returns: pl.DataFrame: A Polars DataFrame containing the concatenated results of all Parquet files, normalized to the specified schema.