diff --git a/src/datarepo/core/tables/deltalake_table.py b/src/datarepo/core/tables/deltalake_table.py index 7c9a57a..0f8a22f 100644 --- a/src/datarepo/core/tables/deltalake_table.py +++ b/src/datarepo/core/tables/deltalake_table.py @@ -305,6 +305,8 @@ def fetch_df_by_partition( partition: list[tuple[str, str, Any]], schema: pa.Schema, storage_options: dict[str, Any] | None = None, + use_batching: bool = False, + batch_size: int = 10, ) -> pl.DataFrame: """ The native delta-rs read has slower performance. The difference comes from the dataset in @@ -325,6 +327,13 @@ def fetch_df_by_partition( Thus, it makes sense to use read_table() since we have a predefined list of files to be loaded from dt.files + Args: + dt: DeltaTable object + partition: Partition filters + schema: Schema to normalize to + storage_options: Storage options + use_batching: Whether to use batched file processing for better performance + batch_size: Number of files to process per batch when batching is enabled """ files = dt.files(partition_filters=partition) @@ -334,10 +343,17 @@ def fetch_df_by_partition( # NOTE: polars has a bug where with_columns(...) on an empty dataframe with no columns will add a row # for all new columns. Workaround by adding the columns before normalizing return _empty_normalized_df(schema) - - return fetch_dfs_by_paths( - files=files, schema=schema, storage_options=storage_options - ) + if use_batching: + return fetch_dfs_by_paths_batching( + files=files, + schema=schema, + storage_options=storage_options, + batch_size=batch_size, + ) + else: + return fetch_dfs_by_paths( + files=files, schema=schema, storage_options=storage_options + ) def fetch_dfs_by_paths( @@ -374,6 +390,54 @@ def fetch_dfs_by_paths( return pl.concat([_normalize_df(df, schema=schema) for df in dfs]) +def fetch_dfs_by_paths_batching( + files: list[str], + schema: pa.Schema, + batch_size: int, + storage_options: dict[str, Any] | None = None, +) -> pl.DataFrame: + """Fetch dataframes from a list of Parquet files using multithreading. + + Args: + files (list[str]): List of file paths to read Parquet files from. + schema (pa.Schema): Schema to normalize the dataframes to. + storage_options (dict[str, Any] | None, optional): Storage options for reading the Parquet files, such as S3 access credentials. Defaults to None. + batch_size (int): Number of files to process per batch. + + Returns: + pl.DataFrame: A Polars DataFrame containing the concatenated results of all Parquet files, normalized to the specified schema. + """ + if not files: + return _empty_normalized_df(schema) + + def batcher(arr: list[str], batch_size: int): + """Split array into batches of specified size.""" + for i in range(0, len(arr), batch_size): + yield arr[i : i + batch_size] + + def read_batch(batch): + return pl.concat( + [ + pl.read_parquet( + f, + retries=READ_PARQUET_RETRY_COUNT, + storage_options=storage_options, + hive_partitioning=True, + ) + for f in batch + ] + ) + + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(read_batch, batch) for batch in batcher(files, batch_size) + ] + + dfs = [future.result() for future in futures] + + return pl.concat([_normalize_df(df, schema=schema) for df in dfs]) + + def _empty_normalized_df(schema: pa.Schema) -> pl.DataFrame: """Create an empty DataFrame with the specified schema.