diff --git a/apis/python/src/tiledbsoma/soma_dataframe.py b/apis/python/src/tiledbsoma/soma_dataframe.py index 5ed15085a3..df23813222 100644 --- a/apis/python/src/tiledbsoma/soma_dataframe.py +++ b/apis/python/src/tiledbsoma/soma_dataframe.py @@ -169,9 +169,9 @@ def read( # TODO: batch_size # TODO: partition, # TODO: platform_config, - ) -> Iterator[pa.RecordBatch]: + ) -> Iterator[pa.Table]: """ - Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.RecordBatch``. + Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.Table``. :param ids: Which rows to read. Defaults to ``None``, meaning no constraint -- all rows. @@ -217,18 +217,12 @@ def read( else: iterator = query.df[ids] - for df in iterator: - batches = df.to_batches() - for batch in batches: - # XXX COMMENT MORE - # This is the 'decode on read' part of our logic; in dim_select we have the - # 'encode on write' part. - # Context: https://github.com/single-cell-data/TileDB-SOMA/issues/99. - # - # Also: don't materialize these on read - # TODO: get the arrow syntax for drop - # df.drop(ROWID, axis=1) - yield util_arrow.ascii_to_unicode_pyarrow_readback(batch) + for table in iterator: + # XXX COMMENT MORE + # This is the 'decode on read' part of our logic; in dim_select we have the + # 'encode on write' part. + # Context: https://github.com/single-cell-data/TileDB-SOMA/issues/99. + yield util_arrow.ascii_to_unicode_pyarrow_readback(table) def read_all( self, @@ -243,11 +237,11 @@ def read_all( # TODO: partition, # TODO: result_order, # TODO: platform_config, - ) -> pa.RecordBatch: + ) -> pa.Table: """ - This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the record batches found. Its nominal use is to simply unit-test cases. + This is a convenience method around ``read``. It concatenates all partial read results into a single Table. Its nominal use is to simplify unit-test cases. """ - return util_arrow.concat_batches( + return pa.concat_tables( self.read( ids=ids, value_filter=value_filter, @@ -273,13 +267,13 @@ def _get_is_sparse(self) -> bool: return self._cached_is_sparse - def write(self, values: pa.RecordBatch) -> None: + def write(self, values: pa.Table) -> None: """ - Write an Arrow.RecordBatch to the persistent object. + Write an Arrow.Table to the persistent object. - :param values: An Arrow.RecordBatch containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMADataFrame``. + :param values: An Arrow.Table containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMADataFrame``. - The ``values`` Arrow RecordBatch must contain a ``soma_rowid`` (uint64) column, indicating which rows are being written. + The ``values`` Arrow Table must contain a ``soma_rowid`` (uint64) column, indicating which rows are being written. """ self._shape = None # cache-invalidate @@ -392,7 +386,7 @@ def read_as_pandas_all( id_column_name: Optional[str] = None, ) -> pd.DataFrame: """ - Reads from SOMA storage into memory. Iterates over batches from ``read_as_pandas``, concatenating the output into a single dataframe. Convenient for unit-test use; also, handy whenever you're certain that the data being queried can be read entirely into memory. + This is a convenience method around ``read``. It concatenates all partial read results into a single DataFrame. Its nominal use is to simplify unit-test cases. """ dataframes = [] generator = self.read_as_pandas( diff --git a/apis/python/src/tiledbsoma/soma_indexed_dataframe.py b/apis/python/src/tiledbsoma/soma_indexed_dataframe.py index cc66d32338..27e8130b00 100644 --- a/apis/python/src/tiledbsoma/soma_indexed_dataframe.py +++ b/apis/python/src/tiledbsoma/soma_indexed_dataframe.py @@ -209,9 +209,9 @@ def read( column_names: Optional[Sequence[str]] = None, result_order: Optional[SOMAResultOrder] = None, # TODO: more arguments - ) -> Iterator[pa.RecordBatch]: + ) -> Iterator[pa.Table]: """ - Read a user-defined subset of data, addressed by the dataframe indexing columns, optionally filtered, and return results as one or more Arrow.RecordBatch. + Read a user-defined subset of data, addressed by the dataframe indexing columns, optionally filtered, and return results as one or more Arrow.Table. :param ids: for each index dimension, which rows to read. Defaults to ``None``, meaning no constraint -- all IDs. @@ -258,14 +258,12 @@ def read( else: iterator = query.df[ids] - for df in iterator: - batches = df.to_batches() - for batch in batches: - # XXX COMMENT MORE - # This is the 'decode on read' part of our logic; in dim_select we have the - # 'encode on write' part. - # Context: # https://github.com/single-cell-data/TileDB-SOMA/issues/99. - yield util_arrow.ascii_to_unicode_pyarrow_readback(batch) + for table in iterator: + # XXX COMMENT MORE + # This is the 'decode on read' part of our logic; in dim_select we have the + # 'encode on write' part. + # Context: # https://github.com/single-cell-data/TileDB-SOMA/issues/99. + yield util_arrow.ascii_to_unicode_pyarrow_readback(table) def read_all( self, @@ -279,19 +277,19 @@ def read_all( # TODO: batch_size # TODO: partition, # TODO: platform_config, - ) -> pa.RecordBatch: + ) -> pa.Table: """ - This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the record batches found. Its nominal use is to simply unit-test cases. + This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the table-pieces found. Its nominal use is to simply unit-test cases. """ - return util_arrow.concat_batches( + return pa.concat_tables( self.read(ids=ids, value_filter=value_filter, column_names=column_names) ) - def write(self, values: pa.RecordBatch) -> None: + def write(self, values: pa.Table) -> None: """ - Write an Arrow.RecordBatch to the persistent object. As duplicate index values are not allowed, index values already present in the object are overwritten and new index values are added. + Write an Arrow.Table to the persistent object. As duplicate index values are not allowed, index values already present in the object are overwritten and new index values are added. - :param values: An Arrow.RecordBatch containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMAIndexedDataFrame``. + :param values: An Arrow.Table containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMAIndexedDataFrame``. """ self._shape = None # cache-invalidate diff --git a/apis/python/src/tiledbsoma/util_arrow.py b/apis/python/src/tiledbsoma/util_arrow.py index 36897d2391..cafba6013a 100644 --- a/apis/python/src/tiledbsoma/util_arrow.py +++ b/apis/python/src/tiledbsoma/util_arrow.py @@ -1,4 +1,4 @@ -from typing import Iterator, Optional, Union +from typing import Optional, Union import numpy as np import pyarrow as pa @@ -121,36 +121,24 @@ def get_arrow_schema_from_tiledb_uri( return pa.schema(arrow_schema_dict) -def ascii_to_unicode_pyarrow_readback(record_batch: pa.RecordBatch) -> pa.RecordBatch: +def ascii_to_unicode_pyarrow_readback(table: pa.Table) -> pa.Table: """ Implements the 'decode on read' part of our ASCII/Unicode logic """ # TODO: COMMENT/LINK HEAVILY - names = [ofield.name for ofield in record_batch.schema] + names = [ofield.name for ofield in table.schema] new_fields = [] for name in names: - old_field = record_batch[name] - if isinstance(old_field, pa.LargeBinaryArray): + old_field = table[name] + # Preferred syntax: + # if len(old_field) > 0 and pa.types.is_large_binary(old_field[0]): + # but: + # AttributeError: 'pyarrow.lib.UInt64Scalar' object has no attribute 'id' + if len(old_field) > 0 and isinstance(old_field[0], pa.LargeBinaryScalar): nfield = pa.array( [element.as_py().decode("utf-8") for element in old_field] ) new_fields.append(nfield) else: new_fields.append(old_field) - return pa.RecordBatch.from_arrays(new_fields, names=names) - - -def concat_batches(batch_generator: Iterator[pa.RecordBatch]) -> pa.RecordBatch: - """ - Iterates a generator of ``pyarrow.RecordBatch`` (e.g. ``SOMADataFrame.read``) and returns a concatenation of all the record batches found. The nominal use is to simply unit-test cases. - """ - batches = [] - for batch in batch_generator: - batches.append(batch) - assert len(batches) > 0 - names = [field.name for field in batches[0].schema] - arrays = [] - for name in names: - array = pa.concat_arrays([batch[name] for batch in batches]) - arrays.append(array) - return pa.RecordBatch.from_arrays(arrays, names=names) + return pa.Table.from_arrays(new_fields, names=names) diff --git a/apis/python/tests/test_soma_collection.py b/apis/python/tests/test_soma_collection.py index 55c353371a..e8cab2680f 100644 --- a/apis/python/tests/test_soma_collection.py +++ b/apis/python/tests/test_soma_collection.py @@ -25,7 +25,7 @@ def create_and_populate_dataframe(dataframe: soma.SOMADataFrame) -> None: pydict["foo"] = [10, 20, 30, 40, 50] pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5] pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"] - rb = pa.RecordBatch.from_pydict(pydict) + rb = pa.Table.from_pydict(pydict) dataframe.write(rb) diff --git a/apis/python/tests/test_soma_dataframe.py b/apis/python/tests/test_soma_dataframe.py index 87dd6ea280..1ea1706e9a 100644 --- a/apis/python/tests/test_soma_dataframe.py +++ b/apis/python/tests/test_soma_dataframe.py @@ -26,118 +26,88 @@ def test_soma_dataframe_non_indexed(tmp_path): pydict["foo"] = [10, 20, 30, 40, 50] pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5] pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"] - rb = pa.RecordBatch.from_pydict(pydict) + rb = pa.Table.from_pydict(pydict) sdf.write(rb) # ---------------------------------------------------------------- # Read all - batch = sdf.read_all() - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 5 + table = sdf.read_all() + assert table.num_rows == 5 # We should be getting back the soma_rowid column as well # If sparse dataframe: - assert batch.num_columns == 4 + assert table.num_columns == 4 # If dense dataframe: - # assert batch.num_columns == 3 + # assert table.num_columns == 3 - # TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4] - assert [e.as_py() for e in list(batch["foo"])] == pydict["foo"] - assert [e.as_py() for e in list(batch["bar"])] == pydict["bar"] - assert [e.as_py() for e in list(batch["baz"])] == pydict["baz"] + # TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4] + assert [e.as_py() for e in list(table["foo"])] == pydict["foo"] + assert [e.as_py() for e in list(table["bar"])] == pydict["bar"] + assert [e.as_py() for e in list(table["baz"])] == pydict["baz"] # ---------------------------------------------------------------- # Read by ids - batch = sdf.read_all(ids=[1, 2]) - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 2 + table = sdf.read_all(ids=[1, 2]) + assert table.num_rows == 2 # We should be getting back the soma_rowid column as well # If sparse dataframe: - assert batch.num_columns == 4 + assert table.num_columns == 4 # If dense dataframe: - # assert batch.num_columns == 3 + # assert table.num_columns == 3 - # TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4] - assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 30] - assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 6.3] - assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "cat"] + # TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4] + assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 30] + assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 6.3] + assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "cat"] # ---------------------------------------------------------------- # Read by ids - batch = sdf.read_all(ids=slice(1, 2)) - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 2 + table = sdf.read_all(ids=slice(1, 2)) + assert table.num_rows == 2 # We should be getting back the soma_rowid column as well # If sparse dataframe: - assert batch.num_columns == 4 + assert table.num_columns == 4 # If dense dataframe: - # assert batch.num_columns == 3 + # assert table.num_columns == 3 - # TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4] - assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 30] - assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 6.3] - assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "cat"] + # TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4] + assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 30] + assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 6.3] + assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "cat"] # ---------------------------------------------------------------- # Read by value_filter - batch = sdf.read_all(value_filter="foo == 40 or foo == 20") - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 2 + table = sdf.read_all(value_filter="foo == 40 or foo == 20") + assert table.num_rows == 2 # We should be getting back the soma_rowid column as well # If sparse dataframe: - assert batch.num_columns == 4 + assert table.num_columns == 4 # If dense dataframe: - # assert batch.num_columns == 3 + # assert table.num_columns == 3 - # TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4] - assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 40] - assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 7.4] - assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "dog"] + # TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4] + assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 40] + assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 7.4] + assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "dog"] # ---------------------------------------------------------------- # Read by value_filter - batch = sdf.read_all(value_filter='baz == "ball" or baz == "dog"') - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 2 + table = sdf.read_all(value_filter='baz == "ball" or baz == "dog"') + assert table.num_rows == 2 # We should be getting back the soma_rowid column as well # If sparse dataframe: - assert batch.num_columns == 4 + assert table.num_columns == 4 # If dense dataframe: - # assert batch.num_columns == 3 + # assert table.num_columns == 3 - # TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4] - assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 40] - assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 7.4] - assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "dog"] + # TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4] + assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 40] + assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 7.4] + assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "dog"] @pytest.fixture @@ -166,7 +136,7 @@ def simple_soma_data_frame(tmp_path): "C": ["this", "is", "a", "test"], } n_data = len(data["soma_rowid"]) - rb = pa.RecordBatch.from_pydict(data) + rb = pa.Table.from_pydict(data) sdf.write(rb) yield (schema, sdf, n_data) sdf.delete() @@ -217,12 +187,12 @@ def _check_tbl(tbl, col_names, ids): ) _check_tbl( - pa.Table.from_batches(sdf.read(ids=ids, column_names=col_names)), + sdf.read_all(ids=ids, column_names=col_names), col_names, ids, ) _check_tbl( - pa.Table.from_batches([sdf.read_all(column_names=col_names)]), + sdf.read_all(column_names=col_names), col_names, None, ) diff --git a/apis/python/tests/test_soma_experiment_basic.py b/apis/python/tests/test_soma_experiment_basic.py index 0dedca624b..3c54ed7e93 100644 --- a/apis/python/tests/test_soma_experiment_basic.py +++ b/apis/python/tests/test_soma_experiment_basic.py @@ -23,7 +23,7 @@ def create_and_populate_obs(obs: t.SOMADataFrame) -> None: pydict["foo"] = [10, 20, 30, 40, 50] pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5] pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"] - rb = pa.RecordBatch.from_pydict(pydict) + rb = pa.Table.from_pydict(pydict) obs.write(rb) @@ -43,7 +43,7 @@ def create_and_populate_var(var: t.SOMADataFrame) -> None: pydict["soma_rowid"] = [0, 1, 2, 3] pydict["quux"] = ["zebra", "yak", "xylophone", "wapiti"] pydict["xyzzy"] = [12.3, 23.4, 34.5, 45.6] - rb = pa.RecordBatch.from_pydict(pydict) + rb = pa.Table.from_pydict(pydict) var.write(rb) diff --git a/apis/python/tests/test_soma_indexed_dataframe.py b/apis/python/tests/test_soma_indexed_dataframe.py index 59e3962979..05694ee47a 100644 --- a/apis/python/tests/test_soma_indexed_dataframe.py +++ b/apis/python/tests/test_soma_indexed_dataframe.py @@ -24,36 +24,28 @@ def test_soma_indexed_dataframe(tmp_path): pydict["foo"] = [10, 20, 30, 40, 50] pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5] pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"] - rb = pa.RecordBatch.from_pydict(pydict) + rb = pa.Table.from_pydict(pydict) sdf.write(rb) # Read all - batch = sdf.read_all() - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 5 - assert batch.num_columns == 3 - assert [e.as_py() for e in list(batch["foo"])] == pydict["foo"] - assert [e.as_py() for e in list(batch["bar"])] == pydict["bar"] - assert [e.as_py() for e in list(batch["baz"])] == pydict["baz"] + table = sdf.read_all() + # Weird thing about pyarrow Table: + # * We have table.num_rows is 5 and table.num_columns is 3 + # * But len(table) is 3 + # * `for column in table` loops over columns + assert table.num_rows == 5 + assert table.num_columns == 3 + assert [e.as_py() for e in list(table["foo"])] == pydict["foo"] + assert [e.as_py() for e in list(table["bar"])] == pydict["bar"] + assert [e.as_py() for e in list(table["baz"])] == pydict["baz"] # Read ids - batch = sdf.read_all(ids=[30, 10]) - # Weird thing about pyarrow RecordBatch: - # * We should have 5 "rows" with 3 "columns" - # * Indeed batch.num_rows is 5 and batch.num_columns is 3 - # * But len(batch) is 3 - # * If you thought `for record in record_batch` would print records ... you would be wrong -- it - # loops over columns - assert batch.num_rows == 2 - assert batch.num_columns == 3 - assert sorted([e.as_py() for e in list(batch["foo"])]) == [10, 30] - assert sorted([e.as_py() for e in list(batch["bar"])]) == [4.1, 6.3] - assert sorted([e.as_py() for e in list(batch["baz"])]) == ["apple", "cat"] + table = sdf.read_all(ids=[30, 10]) + assert table.num_rows == 2 + assert table.num_columns == 3 + assert sorted([e.as_py() for e in list(table["foo"])]) == [10, 30] + assert sorted([e.as_py() for e in list(table["bar"])]) == [4.1, 6.3] + assert sorted([e.as_py() for e in list(table["baz"])]) == ["apple", "cat"] @pytest.fixture @@ -80,7 +72,7 @@ def simple_soma_indexed_data_frame(tmp_path): "C": ["this", "is", "a", "test"], } n_data = len(data["index"]) - rb = pa.RecordBatch.from_pydict(data) + rb = pa.Table.from_pydict(data) sdf.write(rb) yield (schema, sdf, n_data, index_column_names) sdf.delete() @@ -134,12 +126,12 @@ def _check_tbl(tbl, col_names, ids): ) _check_tbl( - pa.Table.from_batches(sdf.read(ids=ids, column_names=col_names)), + sdf.read_all(ids=ids, column_names=col_names), col_names, ids, ) _check_tbl( - pa.Table.from_batches([sdf.read_all(column_names=col_names)]), + sdf.read_all(column_names=col_names), col_names, None, )